代码实现
task / homework | 描述 |
---|---|
adv_subscriptor.py | 写一个agent,可以生成对用户描述需求内容的爬取(要求根据用户需求动态生成爬取代码),定时发送通知任务给用户 |
adv_subscriptor2.py | 1. RunSubscription action启动了SubscriptionRunner,这会让我们的RunSubscription一直无法退出,请尝试将二者分离 |
在$ 4 OSSWatcher的例子里,SubscritionRunner的3要素中,Callback, Trigger是可以被复用的,但Role的可复用性不高
思路1: 让Role可以爬取任何网站的内容,然后分析内容,再总结
思路2: 写一个可以写订阅Agent(SubscriptionRunner)代码的Agent,告诉该Agent浏览什么网页,让它写出爬取网页的代码,自动生成Role,调用SubscriptionRunner,实现订阅需求
基于aiohttp的机制,可以支持静态网页,对需要执行js的动态内容支持不够;
特点/库 | pyplaywright | aiohttp |
---|---|---|
类型 | 浏览器自动化库 | 异步HTTP请求库 |
使用场景 | 处理动态网页、模拟用户交互、浏览器自动化 | 高性能异步HTTP请求、爬取静态页面 |
安装 | pip install playwright | pip install aiohttp |
依赖 | Playwright 浏览器驱动(Chromium等) | 标准库及可选的异步相关库(asyncio等) |
学习曲线 | 相对较高,涉及浏览器自动化概念和API | 相对较低,异步编程经验有助于理解 |
性能 | 较慢,涉及模拟浏览器行为 | 较快,专注于处理异步HTTP请求 |
页面交互 | 支持,可以模拟用户在浏览器中的交互 | 不涉及页面交互,主要处理HTTP请求和响应 |
JavaScript执行 | 支持,可以执行浏览器中的JavaScript代码 | 不执行JavaScript代码 |
用途示例 | 爬取动态网页、自动化测试、模拟用户交互 | 高性能的异步爬虫、API请求等 |
task6.1 完成任务
思路2的方案的实现路线分析:
// 请帮我提取https://github.com/trending的今日榜单,推荐2~3个与LLM相关的仓库,并给出推荐理由,然后在晚上七点半发送给我
// => ParseSubRequirement action的执行得到结构化输出
{
"cron expression": "30 19 * * *",
"crawler url list": ["<http://github.com/trending>"],
"page content expression": "获取今日榜单",
"crawle post processing": "推荐2~3个与LLM相关的仓库,并给出推荐理由"
}
图1.本例的class diagram: Roles, Actions, ActionNodes, SubscriptionRunner, Trigger, Callback
图2.本例的整个执行流程
本例涉及了目前学到的全部内容的基本使用方式:Team, Role, Action, ActionNode, SubscriptionRunner, Trigger, Callback; 下面用伪码宏观的描述metagpt执行的的重点的机制:
# 启动team
team = Team()
team.hire([Role1(), Role2()])
team.run_project("requirement")
team.run
Role的执行:
# environment.py Environment class的run方法:
async def run(self, k=1):
"""处理一次所有信息的运行
Process all Role runs at once
"""
for _ in range(k):
futures = []
for role in self.roles.values():
future =role.run()
futures.append(future)
await asyncio.gather(*futures)
logger.debug(f"is idle: {self.is_idle}")
# role.py Role class的run方法
# role的run方法常被override,原始的版本如下:
@role_raise_decorator
async def run(self, with_message=None) -> Message | None:
"""Observe, and think and act based on the results of the observation"""
# 如果有with_messag消息,处理消息
if with_message:
# 略...
if not msg.cause_by:
# 把消息的cause_by(有那个action发出的,设为预置的UserRequirement)
msg.cause_by = UserRequirement
self.put_message(msg)
if not await self._observe():
# If there is no new information, suspend and wait
logger.debug(f"{self._setting}: no news. waiting.")
return
rsp = await self.react()
# Reset the next action to be taken.
self.rc.todo = None
# Send the response message to the Environment object to have it relay the message to the subscribers.self.publish_message(rsp)
return rsp
async def _observe(self, ignore_memory=False) -> int:
"""Prepare new messages for processing from the message buffer and other sources."""
# Read unprocessed messages from the msg buffer.
news = []
if self.recovered:
news = [self.latest_observed_msg] if self.latest_observed_msg else []
if not news:
# 在environment object publish_message方法被调用时,填充某个role的msg_buffer.# publish_message的被调用时机有2个,team.run_project(), role.run()
news = self.rc.msg_buffer.pop_all()
old_messages = [] if ignore_memory else self.rc.memory.get()
# 填入news到memory: 即recover所需的最后一条message,以及可能的rc.msg_buffer 中的消息(cangpeng)到memory
self.rc.memory.add_batch(news)
logger.debug(f"role setting: {self._setting}, _observe: before filter, news={news}, role is watching actions, watch={self.rc.watch}")
# Filter out messages of interest.
self.rc.news = [
n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]
logger.debug(f"role setting: {self._setting}, _observe: after filter, news={self.rc.news}, len(self.rc.news)={len(self.rc.news)}")
self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None # record the latest observed msg
news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
if news_text:
logger.debug(f"{self._setting} observed: {news_text}")
return len(self.rc.news)
_watch([ParseSubRequirement])
, CrawlEngineer role可以观测到该消息,继续执行润后续的react()class SubscriptionAssistant(Role):
def __init__(self, **kwargs):
...
# 关注了预置的UserRequirement action 发出的Message,会率先执行;
# 关注了WriteCrawleCode,在CrawleEngineer role执行完WriteCrawleCode action之后,发布该消息,SubscriptionAssistant观测到,就会继续执行;
self._watch([UserRequirement, WriteCrawleCode])
async def _act(self) -> Message:
logger.info(f"_act(): {self._setting}: to do {self.rc.todo}({self.rc.todo.name}), self.rc.history={self.rc.history}")
# 调用action的run方法,传入所有memory Message
response = await self.rc.todo.run(self.rc.history)
# 处理message
# 由于AcitonNode的run返回的是ActionNode, 需要转成Message
if isinstance(response, (ActionOutput, ActionNode)):
msg = Message(
content=response.content,
instruct_content=response.instruct_content,
role=self._setting,
cause_by=self.rc.todo,
sent_from=self,
)
elif isinstance(response, Message):
msg = response
else:
msg = Message(content=response, role=self.profile, cause_by=self.rc.todo, sent_from=self)
# 把消息加入到memory
self.rc.memory.add(msg)
return msg
**team.hire()的分析:**该过程主要是做了3件事:
把role list假如到environment object roles list中;
设定environment和role中订阅的message是什么:
{ f"{module}.{name}"}
,但是也可以在role subscribe()新增订阅的描述,role.subscribe时,也必须更新到env中;def subscribe(self, tags: Set[str]):
self.subscription = tags
if self.rc.env: # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
self.rc.env.set_subscription(self, self.subscription)
set_subscription()
, 是env.members dict[Role, Set], 是env中记录着每个role mapping到role所订阅的message的"send_to"信息(即,当message.send_to是这个role.subscribe信息的一部分时,说明这个message是发送给这个role的), 此时,就把该message,put_message到该role的msg_buffer中;
send_to: set[str] = Field(default={**MESSAGE_ROUTE_TO_ALL**}, validate_default=True)
, MESSAGE_ROUTE_TO_ALL, 导致is_subscribed(message, subscription)总为True,加入到所有role的msg_buffer中;# 这是Environment publish_message
def publish_message(self, message: Message, peekable: bool = True) -> bool:
logger.debug(f"publish_message: {message.dump()}")
found = False
# According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
for role, subscription in self.members.items():
ifis_subscribed(message, subscription):
role.put_message(message)
found = True
if not found:
logger.warning(f"Message no recipients: {message.dump()}")
self.history += f"\\n{message}" # For debug
return True
# 这是common.py 判断role的subscription 某一tag(i.e. name or profile of a role)是否在message.send_to中;
# MESSAGE_ROUTE_TO_ALL 是发送给所有role的message_buffer的;
def is_subscribed(message: "Message", tags: set):
# Message默认情况下,send_to 为 MESSAGE_ROUTE_TO_ALL
if MESSAGE_ROUTE_TO_ALL in message.send_to:
return True
for i in tags:
if i in message.send_to:
return True
return False