<cite id="ffb66"></cite><cite id="ffb66"><track id="ffb66"></track></cite>
      <legend id="ffb66"><li id="ffb66"></li></legend>
      色婷婷久,激情色播,久久久无码专区,亚洲中文字幕av,国产成人A片,av无码免费,精品久久国产,99视频精品3
      網易首頁 > 網易號 > 正文 申請入駐

      GraphRAG進階:基于Neo4j與LlamaIndex的DRIFT搜索實現詳解

      0
      分享至

      微軟的GraphRAG算得上是最早一批成熟的GraphRAG系統,它把索引階段(抽取實體、關系、構建層級社區并生成摘要)和查詢階段的高級能力整合到了一起。這套方案的優勢在于,可以借助預先計算好的實體、關系、社區摘要來回答那些宏觀的、主題性的問題,這恰恰是傳統RAG系統基于文檔檢索難以做到的。



      本文的重點是DRIFT搜索:Dynamic Reasoning and Inference with Flexible Traversal,翻譯過來就是"動態推理與靈活遍歷"。這是一種相對較新的檢索策略,兼具全局搜索和局部搜索的特點。

      DRIFT的工作流程是這樣的:先通過向量搜索建立一個寬泛的查詢起點,再利用群信息把原始問題拆解成更細粒度的后續查詢。然后動態地在知識圖譜上游走,抓取實體、關系等局部細節。這種設計在計算效率和答案質量之間找到了一個不錯的平衡點。



      上圖為使用 LlamaIndex 工作流和 Neo4j 實現的 DRIFT 搜索,核心流程分一下幾步:

      首先是HyDE生成,基于一份樣例社區報告構造假設性答案,用來改善查詢的向量表示。

      接著社區搜索登場,通過向量相似度找出最相關的社區報告,給查詢提供宏觀上下文。系統會分析這些結果,輸出一個初步的中間答案,同時生成一批后續查詢用于深挖。

      這些后續查詢會在局部搜索階段并行執行,從知識圖譜里撈出文本塊、實體、關系、以及更多社區報告。這個過程可以迭代多輪,每輪都可能產生新的后續查詢。

      最后是答案生成,把過程中積累的所有中間答案匯總起來,融合社區級別的宏觀洞察和局部細節,生成最終響應。整體思路就是先鋪開、再聚焦,層層遞進。

      本文用的是《愛麗絲夢游仙境》,劉易斯·卡羅爾的經典作品,這部小說角色眾多、場景豐富、事件環環相扣,拿來演示GraphRAG的能力再合適不過。

      數據導入



      整個pipeline遵循標準的GraphRAG流程,分三個階段:

      class MSGraphRAGIngestion(Workflow):
      @step
      async def entity_extraction(self, ev: StartEvent) -> EntitySummarization:
      chunks = splitter.split_text(ev.text)
      await ms_graph.extract_nodes_and_rels(chunks, ev.allowed_entities)
      return EntitySummarization()
      @step
      async def entity_summarization(
      self, ev: EntitySummarization
      ) -> CommunitySummarization:
      await ms_graph.summarize_nodes_and_rels()
      return CommunitySummarization()
      @step
      async def community_summarization(
      self, ev: CommunitySummarization
      ) -> CommunityEmbeddings:
      await ms_graph.summarize_communities()
      return CommunityEmbeddings()

      先從文本塊里抽取實體和關系,再給節點和關系生成摘要,最后構建層級社區并生成社區摘要。

      摘要做完之后,要給社區和實體都生成向量嵌入,這樣才能支持相似性檢索。社區嵌入的代碼長這樣:

      @step
      async def community_embeddings(self, ev: CommunityEmbeddings) -> EntityEmbeddings:
      # Fetch all communities from the graph database
      communities = ms_graph.query(
      """
      MATCH (c:__Community__)
      WHERE c.summary IS NOT NULL AND c.rating > $min_community_rating
      RETURN coalesce(c.title, "") + " " + c.summary AS community_description, c.id AS community_id
      """,
      params={"min_community_rating": MIN_COMMUNITY_RATING},
      )
      if communities:
      # Generate vector embeddings from community descriptions
      response = await client.embeddings.create(
      input=[c["community_description"] for c in communities],
      model=TEXT_EMBEDDING_MODEL,
      )
      # Store embeddings in the graph and create vector index
      embeds = [
      {
      "community_id": community["community_id"],
      "embedding": embedding.embedding,
      }
      for community, embedding in zip(communities, response.data)
      ]
      ms_graph.query(
      """UNWIND $data as row
      MATCH (c:__Community__ {id: row.community_id})
      CALL db.create.setNodeVectorProperty(c, 'embedding', row.embedding)""",
      params={"data": embeds},
      )
      ms_graph.query(
      "CREATE VECTOR INDEX community IF NOT EXISTS FOR (c:__Community__) ON c.embedding"
      )
      return EntityEmbeddings()

      實體嵌入同理,這樣DRIFT搜索需要的向量索引就都建好了。

      DRIFT搜索

      DRIFT的檢索思路其實很符合簡單:先看大圖,再挖細節。它不會一上來就在文檔或實體層面做精確匹配,而是先去查群的摘要,因為這些摘要是對知識圖譜主要主題的高層次概括。

      拿到相關的高層信息后,DRIFT會智能地派生出后續查詢,去精確檢索特定實體、關系、源文檔。這種兩階段的做法其實很像人類查資料的習慣:先大致了解情況再針對性地追問細節。既有全局搜索的覆蓋面,又有局部搜索的精準度,而且不用把所有社區報告或文檔都過一遍,計算開銷控制得不錯。

      下面拆解一下各個階段的實現。

      群搜索

      DRIFT用了HyDE技術來提升向量檢索的準確率。不是直接拿用戶query做embedding,而是先讓模型生成一個假設性的答案,再用這個答案去做相似性搜索。道理很簡單:假設答案在語義上跟真實的摘要更接近。

      @step
      async def hyde_generation(self, ev: StartEvent) -> CommunitySearch:
      # Fetch a random community report to use as a template for HyDE generation
      random_community_report = driver.execute_query(
      """
      MATCH (c:__Community__)
      WHERE c.summary IS NOT NULL
      RETURN coalesce(c.title, "") + " " + c.summary AS community_description""",
      result_transformer_=lambda r: r.data(),
      )
      # Generate a hypothetical answer to improve query representation
      hyde = HYDE_PROMPT.format(
      query=ev.query, template=random_community_report[0]["community_description"]
      )
      hyde_response = await client.responses.create(
      model="gpt-5-mini",
      input=[{"role": "user", "content": hyde}],
      reasoning={"effort": "low"},
      )
      return CommunitySearch(query=ev.query, hyde_query=hyde_response.output_text)

      拿到HyDE query之后,做embedding,然后通過向量相似度撈出top 5的報告。接著讓LLM基于這些報告生成一個初步答案,同時識別出需要深挖的后續查詢。將初步答案存起來然后進行后續查詢全部并行分發到局部搜索階段。


      @step
      async def community_search(self, ctx: Context, ev: CommunitySearch) -> LocalSearch:
      # Create embedding from the HyDE-enhanced query
      embedding_response = await client.embeddings.create(
      input=ev.hyde_query, model=TEXT_EMBEDDING_MODEL
      )
      embedding = embedding_response.data[0].embedding
      # Find top 5 most relevant community reports via vector similarity
      community_reports = driver.execute_query(
      """
      CALL db.index.vector.queryNodes('community', 5, $embedding) YIELD node, score
      RETURN 'community-' + node.id AS source_id, node.summary AS community_summary
      """,
      result_transformer_=lambda r: r.data(),
      embedding=embedding,
      )
      # Generate initial answer and identify what additional info is needed
      initial_prompt = DRIFT_PRIMER_PROMPT.format(
      query=ev.query, community_reports=community_reports
      )
      initial_response = await client.responses.create(
      model="gpt-5-mini",
      input=[{"role": "user", "content": initial_prompt}],
      reasoning={"effort": "low"},
      )
      response_json = json_repair.loads(initial_response.output_text)
      print(f"Initial intermediate response: {response_json['intermediate_answer']}")
      # Store the initial answer and prepare for parallel local searches
      async with ctx.store.edit_state() as ctx_state:
      ctx_state["intermediate_answers"] = [
      {
      "intermediate_answer": response_json["intermediate_answer"],
      "score": response_json["score"],
      }
      ]
      ctx_state["local_search_num"] = len(response_json["follow_up_queries"])
      # Dispatch follow-up queries to run in parallel
      for local_query in response_json["follow_up_queries"]:
      ctx.send_event(LocalSearch(query=ev.query, local_query=local_query))
      return None

      這就是DRIFT的核心思路,先用HyDE增強的社區搜索鋪開,再用后續查詢往下鉆。

      局部搜索

      局部搜索階段把后續查詢并行跑起來,深入到具體細節。每個查詢通過實體向量檢索拿到目標上下文,生成中間答案,可能還會產出更多后續查詢。

      @step(num_workers=5)
      async def local_search(self, ev: LocalSearch) -> LocalSearchResults:
      print(f"Running local query: {ev.local_query}")
      # Create embedding for the local query
      response = await client.embeddings.create(
      input=ev.local_query, model=TEXT_EMBEDDING_MODEL
      )
      embedding = response.data[0].embedding
      # Retrieve relevant entities and gather their associated context:
      # - Text chunks where entities are mentioned
      # - Community reports the entities belong to
      # - Relationships between the retrieved entities
      # - Entity descriptions
      local_reports = driver.execute_query(
      """
      CALL db.index.vector.queryNodes('entity', 5, $embedding) YIELD node, score
      WITH collect(node) AS nodes
      WITH
      collect {
      UNWIND nodes as n
      MATCH (n)<-[:MENTIONS]->(c:__Chunk__)
      WITH c, count(distinct n) as freq
      RETURN {chunkText: c.text, source_id: 'chunk-' + c.id}
      ORDER BY freq DESC
      LIMIT 3
      } AS text_mapping,
      collect {
      UNWIND nodes as n
      MATCH (n)-[:IN_COMMUNITY*]->(c:__Community__)
      WHERE c.summary IS NOT NULL
      WITH c, c.rating as rank
      RETURN {summary: c.summary, source_id: 'community-' + c.id}
      ORDER BY rank DESC
      LIMIT 3
      } AS report_mapping,
      collect {
      UNWIND nodes as n
      MATCH (n)-[r:SUMMARIZED_RELATIONSHIP]-(m)
      WHERE m IN nodes
      RETURN {descriptionText: r.summary, source_id: 'relationship-' + n.name + '-' + m.name}
      LIMIT 3
      } as insideRels,
      collect {
      UNWIND nodes as n
      RETURN {descriptionText: n.summary, source_id: 'node-' + n.name}
      } as entities
      RETURN {Chunks: text_mapping, Reports: report_mapping,
      Relationships: insideRels,
      Entities: entities} AS output
      """,
      result_transformer_=lambda r: r.data(),
      embedding=embedding,
      )
      # Generate answer based on the retrieved context
      local_prompt = DRIFT_LOCAL_SYSTEM_PROMPT.format(
      response_type=DEFAULT_RESPONSE_TYPE,
      context_data=local_reports,
      global_query=ev.query,
      )
      local_response = await client.responses.create(
      model="gpt-5-mini",
      input=[{"role": "user", "content": local_prompt}],
      reasoning={"effort": "low"},
      )
      response_json = json_repair.loads(local_response.output_text)
      # Limit follow-up queries to prevent exponential growth
      response_json["follow_up_queries"] = response_json["follow_up_queries"][:LOCAL_TOP_K]
      return LocalSearchResults(results=response_json, query=ev.query)

      下一步負責編排迭代深化的過程。用collect_events等所有并行搜索跑完,然后判斷要不要繼續往下挖。如果當前深度還沒到上限(這里設的max depth=2),就把所有結果里的后續查詢提取出來,存好中間答案分發下一輪并行搜索。

      @step
      async def local_search_results(
      self, ctx: Context, ev: LocalSearchResults
      ) -> LocalSearch | FinalAnswer:
      local_search_num = await ctx.store.get("local_search_num")
      # Wait for all parallel searches to complete
      results = ctx.collect_events(ev, [LocalSearchResults] * local_search_num)
      if results is None:
      return None
      intermediate_results = [
      {
      "intermediate_answer": event.results["response"],
      "score": event.results["score"],
      }
      for event in results
      ]
      current_depth = await ctx.store.get("local_search_depth", default=1)
      query = [ev.query for ev in results][0]
      # Continue drilling down if we haven't reached max depth
      if current_depth < MAX_LOCAL_SEARCH_DEPTH:
      await ctx.store.set("local_search_depth", current_depth + 1)
      follow_up_queries = [
      query
      for event in results
      for query in event.results["follow_up_queries"]
      ]
      # Store intermediate answers and dispatch next round of searches
      async with ctx.store.edit_state() as ctx_state:
      ctx_state["intermediate_answers"].extend(intermediate_results)
      ctx_state["local_search_num"] = len(follow_up_queries)
      for local_query in follow_up_queries:
      ctx.send_event(LocalSearch(query=query, local_query=local_query))
      return None
      else:
      return FinalAnswer(query=query)

      這樣就形成了一個迭代細化的循環,每一層都在前一層的基礎上繼續深挖。達到最大深度后,觸發最終答案生成。

      最終答案

      最后一步把整個DRIFT搜索過程中積攢的所有中間答案匯總成一個完整的響應:這里包括社區搜索的初步答案,以及局部搜索各輪迭代產出的答案。

      @step
      async def final_answer_generation(self, ctx: Context, ev: FinalAnswer) -> StopEvent:
      # Retrieve all intermediate answers collected throughout the search process
      intermediate_answers = await ctx.store.get("intermediate_answers")
      # Synthesize all findings into a comprehensive final response
      answer_prompt = DRIFT_REDUCE_PROMPT.format(
      response_type=DEFAULT_RESPONSE_TYPE,
      context_data=intermediate_answers,
      global_query=ev.query,
      )
      answer_response = await client.responses.create(
      model="gpt-5-mini",
      input=[
      {"role": "developer", "content": answer_prompt},
      {"role": "user", "content": ev.query},
      ],
      reasoning={"effort": "low"},
      )
      return StopEvent(result=answer_response.output_text)

      總結

      DRIFT搜索提供了一個挺有意思的思路,在全局搜索的廣度和局部搜索的精度之間找到了平衡。從社區級上下文切入,通過迭代的后續查詢逐層下探,既避免了遍歷所有社區報告的計算負擔,又保證了覆蓋面。

      這里還有改進空間,比如目前的實現對所有中間答案一視同仁,如果能根據置信度分數做個篩選,最終答案的質量應該會更好,噪聲也能降下來。后續查詢也可以先按相關性或信息增益排個序,優先追蹤最有價值的線索。

      另一個值得嘗試的方向是加一個查詢精煉步驟,用LLM分析所有生成的后續查詢,把相似的歸并起來避免重復搜索,過濾掉那些大概率沒什么收獲的查詢。這樣能大幅減少局部搜索的次數,同時不影響答案質量。

      完整代碼

      https://avoid.overfit.cn/post/5eaca452dcc7422d8c2308586e7cfe56

      有興趣的可以自己跑跑看,或者在這個基礎上做些改進。

      作者:Tomaz Bratanic

      特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發布,本平臺僅提供信息存儲服務。

      Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

      相關推薦
      熱點推薦
      歐爾班宣布反制措施:在我們耗盡石油之前,烏克蘭人將先耗盡資金

      歐爾班宣布反制措施:在我們耗盡石油之前,烏克蘭人將先耗盡資金

      陳恧侃故事
      2026-03-07 11:31:12
      現場直擊:伊朗防空系統攔截美以目標

      現場直擊:伊朗防空系統攔截美以目標

      新華社
      2026-03-06 10:54:01
      鄭爽分享美國近況,穿搭不輸當紅明星,自曝做醫美網友直呼認不出

      鄭爽分享美國近況,穿搭不輸當紅明星,自曝做醫美網友直呼認不出

      萌神木木
      2026-03-06 17:33:33
      河南女孩六年前為救父親性命,稱誰給40萬就嫁給誰,如今過得如何

      河南女孩六年前為救父親性命,稱誰給40萬就嫁給誰,如今過得如何

      牛鍋巴小釩
      2026-03-06 19:45:04
      致敬西虹市首富?切爾西眾人開球前將球圍在中間,解說員啞然失笑

      致敬西虹市首富?切爾西眾人開球前將球圍在中間,解說員啞然失笑

      懂球帝
      2026-03-07 13:08:08
      廣西女子發現罕見青竹鯉,時不時側身蹭水底,網友:魚生天花板!

      廣西女子發現罕見青竹鯉,時不時側身蹭水底,網友:魚生天花板!

      貍貓之一的動物圈
      2026-03-06 09:38:48
      小學生實名投訴極氪 建議取消讓她寫作業的小桌板!極氪回應

      小學生實名投訴極氪 建議取消讓她寫作業的小桌板!極氪回應

      快科技
      2026-03-06 23:12:33
      王震堅決反對中顧委副主任排名,薄一波:我是常務,就這么定了

      王震堅決反對中顧委副主任排名,薄一波:我是常務,就這么定了

      芊芊子吟
      2026-03-06 09:45:07
      莫雷加德全家抵達重慶:對這座城市印象深刻,會請樊振東推薦美食

      莫雷加德全家抵達重慶:對這座城市印象深刻,會請樊振東推薦美食

      乒談
      2026-03-07 00:19:01
      中國女籃72-66再勝巴西,不是張子宇王思雨,她17+7成新核

      中國女籃72-66再勝巴西,不是張子宇王思雨,她17+7成新核

      林子說事
      2026-03-07 08:15:14
      速度滑冰世錦賽:寧忠巖收獲短距離全能、男子1000米兩項季軍

      速度滑冰世錦賽:寧忠巖收獲短距離全能、男子1000米兩項季軍

      懂球帝
      2026-03-07 07:21:57
      高市早苗被逼到絕路:派也死,不派也死

      高市早苗被逼到絕路:派也死,不派也死

      鯨探所長
      2026-03-07 12:02:36
      霍爾木茲海峽船只遭襲4死3重傷!兩萬海員被困,伊朗稱不會關閉海峽,但與以美有關船只不得通行;普京與伊總統通話:通過多種渠道保持聯系

      霍爾木茲海峽船只遭襲4死3重傷!兩萬海員被困,伊朗稱不會關閉海峽,但與以美有關船只不得通行;普京與伊總統通話:通過多種渠道保持聯系

      大風新聞
      2026-03-07 10:05:06
      霍震霆也沒想到,46歲的霍啟剛,會在兩會上憑一個舉動給霍家長臉

      霍震霆也沒想到,46歲的霍啟剛,會在兩會上憑一個舉動給霍家長臉

      攬星河的筆記
      2026-03-06 23:55:22
      結束了!整整27年生涯!曝冠軍主帥最后一舞

      結束了!整整27年生涯!曝冠軍主帥最后一舞

      籃球實戰寶典
      2026-03-06 18:57:43
      晴好周末,出游安排起來 | 天氣早知道

      晴好周末,出游安排起來 | 天氣早知道

      上觀新聞
      2026-03-07 11:57:06
      針對“不敢休、不讓休”怪圈,國家出手了!

      針對“不敢休、不讓休”怪圈,國家出手了!

      國是直通車
      2026-03-07 09:12:15
      村里紅白事從不回,男子母親離世,鄰居等著看笑話,結果長了見識

      村里紅白事從不回,男子母親離世,鄰居等著看笑話,結果長了見識

      子芫伴你成長
      2026-02-23 12:21:40
      重回國乒?塵埃落定,劉國梁發聲,崗位曝光,布局國乒男隊發展

      重回國乒?塵埃落定,劉國梁發聲,崗位曝光,布局國乒男隊發展

      卿子書
      2026-03-06 09:25:27
      比賽還沒開打,上海申花先迎來兩個壞消息,新賽季斬獲開門紅懸了

      比賽還沒開打,上海申花先迎來兩個壞消息,新賽季斬獲開門紅懸了

      零度眼看球
      2026-03-07 08:58:12
      2026-03-07 13:40:49
      deephub incentive-icons
      deephub
      CV NLP和數據挖掘知識
      1940文章數 1456關注度
      往期回顧 全部

      科技要聞

      OpenClaw爆火,六位"養蝦人"自述與AI共生

      頭條要聞

      特朗普突然放話"先解決伊朗后解決古巴" 梅西聽懵了

      頭條要聞

      特朗普突然放話"先解決伊朗后解決古巴" 梅西聽懵了

      體育要聞

      塔圖姆歸來:凱爾特人的春之綠

      娛樂要聞

      周杰倫田馥甄的“JH戀” 被扒得底朝天

      財經要聞

      針對"不敢休、不讓休"怪圈 國家出手了

      汽車要聞

      逃離ICU,上汽通用“止血”企穩

      態度原創

      教育
      房產
      旅游
      公開課
      軍事航空

      教育要聞

      兩會速遞|教育部部長:將實施新一輪學生心理健康促進行動

      房產要聞

      傳統學區房熄火?2月海口二手房爆火的板塊竟然是…

      旅游要聞

      文旅部部長:7名外國游客到上海旅游,買了40箱貨;“成為中國人”成了熱詞

      公開課

      李玫瑾:為什么性格比能力更重要?

      軍事要聞

      伊朗:使用無人機擊中美軍"林肯"號航母

      無障礙瀏覽 進入關懷版