Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 19 additions & 57 deletions apps/application/flow/workflow_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,62 +234,25 @@ def run_block(self, language='zh'):
非流式响应
@return: 结果
"""
try:
self.run_chain_async(None, None, language)
while self.is_run():
pass
details = self.get_runtime_details()
message_tokens = sum([row.get('message_tokens') for row in details.values() if
'message_tokens' in row and row.get('message_tokens') is not None])
answer_tokens = sum([row.get('answer_tokens') for row in details.values() if
'answer_tokens' in row and row.get('answer_tokens') is not None])
answer_text_list = self.get_answer_text_list()
answer_text = '\n\n'.join(
'\n\n'.join([a.get('content') for a in answer]) for answer in
answer_text_list)
answer_list = reduce(lambda pre, _n: [*pre, *_n], answer_text_list, [])
self.work_flow_post_handler.handler(self)

res = self.base_to_response.to_block_response(self.params['chat_id'],
self.params['chat_record_id'], answer_text, True
, message_tokens, answer_tokens,
_status=status.HTTP_200_OK if self.status == 200 else status.HTTP_500_INTERNAL_SERVER_ERROR,
other_params={'answer_list': answer_list})
finally:
self._cleanup()
return res

def _cleanup(self):
"""清理所有对象引用"""
# 清理列表
self.future_list.clear()
self.field_list.clear()
self.global_field_list.clear()
self.chat_field_list.clear()
self.image_list.clear()
self.video_list.clear()
self.document_list.clear()
self.audio_list.clear()
self.other_list.clear()
if hasattr(self, 'node_context'):
self.node_context.clear()

# 清理字典
self.context.clear()
self.chat_context.clear()
self.form_data.clear()

# 清理对象引用
self.node_chunk_manage = None
self.work_flow_post_handler = None
self.flow = None
self.start_node = None
self.current_node = None
self.current_result = None
self.chat_record = None
self.base_to_response = None
self.params = None
self.lock = None
self.run_chain_async(None, None, language)
while self.is_run():
pass
details = self.get_runtime_details()
message_tokens = sum([row.get('message_tokens') for row in details.values() if
'message_tokens' in row and row.get('message_tokens') is not None])
answer_tokens = sum([row.get('answer_tokens') for row in details.values() if
'answer_tokens' in row and row.get('answer_tokens') is not None])
answer_text_list = self.get_answer_text_list()
answer_text = '\n\n'.join(
'\n\n'.join([a.get('content') for a in answer]) for answer in
answer_text_list)
answer_list = reduce(lambda pre, _n: [*pre, *_n], answer_text_list, [])
self.work_flow_post_handler.handler(self)
return self.base_to_response.to_block_response(self.params['chat_id'],
self.params['chat_record_id'], answer_text, True
, message_tokens, answer_tokens,
_status=status.HTTP_200_OK if self.status == 200 else status.HTTP_500_INTERNAL_SERVER_ERROR,
other_params={'answer_list': answer_list})

def run_stream(self, current_node, node_result_future, language='zh'):
"""
Expand Down Expand Up @@ -344,7 +307,6 @@ def await_result(self):
'',
[],
'', True, message_tokens, answer_tokens, {})
self._cleanup()

def run_chain_async(self, current_node, node_result_future, language='zh'):
future = executor.submit(self.run_chain_manage, current_node, node_result_future, language)
Expand Down
Loading