镜像自地址
https://github.com/binary-husky/gpt_academic.git
已同步 2025-12-07 15:06:48 +00:00
follow master
这个提交包含在:
151
toolbox.py
151
toolbox.py
@@ -5,6 +5,8 @@ import inspect
|
||||
import re
|
||||
import os
|
||||
import gradio
|
||||
import shutil
|
||||
import glob
|
||||
from latex2mathml.converter import convert as tex2mathml
|
||||
from functools import wraps, lru_cache
|
||||
pj = os.path.join
|
||||
@@ -77,14 +79,24 @@ def ArgsGeneralWrapper(f):
|
||||
}
|
||||
chatbot_with_cookie = ChatBotWithCookies(cookies)
|
||||
chatbot_with_cookie.write_list(chatbot)
|
||||
|
||||
if cookies.get('lock_plugin', None) is None:
|
||||
# 正常状态
|
||||
yield from f(txt_passon, llm_kwargs, plugin_kwargs, chatbot_with_cookie, history, system_prompt, *args)
|
||||
if len(args) == 0: # 插件通道
|
||||
yield from f(txt_passon, llm_kwargs, plugin_kwargs, chatbot_with_cookie, history, system_prompt, request)
|
||||
else: # 对话通道,或者基础功能通道
|
||||
yield from f(txt_passon, llm_kwargs, plugin_kwargs, chatbot_with_cookie, history, system_prompt, *args)
|
||||
else:
|
||||
# 处理个别特殊插件的锁定状态
|
||||
# 处理少数情况下的特殊插件的锁定状态
|
||||
module, fn_name = cookies['lock_plugin'].split('->')
|
||||
f_hot_reload = getattr(importlib.import_module(module, fn_name), fn_name)
|
||||
yield from f_hot_reload(txt_passon, llm_kwargs, plugin_kwargs, chatbot_with_cookie, history, system_prompt, request)
|
||||
# 判断一下用户是否错误地通过对话通道进入,如果是,则进行提醒
|
||||
final_cookies = chatbot_with_cookie.get_cookies()
|
||||
# len(args) != 0 代表“提交”键对话通道,或者基础功能通道
|
||||
if len(args) != 0 and 'files_to_promote' in final_cookies and len(final_cookies['files_to_promote']) > 0:
|
||||
chatbot_with_cookie.append(["检测到**滞留的缓存文档**,请及时处理。", "请及时点击“**保存当前对话**”获取所有滞留文档。"])
|
||||
yield from update_ui(chatbot_with_cookie, final_cookies['history'], msg="检测到被滞留的缓存文档")
|
||||
return decorated
|
||||
|
||||
|
||||
@@ -94,7 +106,8 @@ def update_ui(chatbot, history, msg='正常', **kwargs): # 刷新界面
|
||||
"""
|
||||
assert isinstance(chatbot, ChatBotWithCookies), "在传递chatbot的过程中不要将其丢弃。必要时, 可用clear将其清空, 然后用for+append循环重新赋值。"
|
||||
cookies = chatbot.get_cookies()
|
||||
|
||||
# 备份一份History作为记录
|
||||
cookies.update({'history': history})
|
||||
# 解决插件锁定时的界面显示问题
|
||||
if cookies.get('lock_plugin', None):
|
||||
label = cookies.get('llm_model', "") + " | " + "正在锁定插件" + cookies.get('lock_plugin', None)
|
||||
@@ -171,7 +184,7 @@ def HotReload(f):
|
||||
========================================================================
|
||||
第二部分
|
||||
其他小工具:
|
||||
- write_results_to_file: 将结果写入markdown文件中
|
||||
- write_history_to_file: 将结果写入markdown文件中
|
||||
- regular_txt_to_markdown: 将普通文本转换为Markdown格式的文本。
|
||||
- report_execption: 向chatbot中添加简单的意外错误信息
|
||||
- text_divide_paragraph: 将文本按照段落分隔符分割开,生成带有段落标签的HTML代码。
|
||||
@@ -203,36 +216,6 @@ def get_reduce_token_percent(text):
|
||||
return 0.5, '不详'
|
||||
|
||||
|
||||
def write_results_to_file(history, file_name=None):
|
||||
"""
|
||||
将对话记录history以Markdown格式写入文件中。如果没有指定文件名,则使用当前时间生成文件名。
|
||||
"""
|
||||
import os
|
||||
import time
|
||||
if file_name is None:
|
||||
# file_name = time.strftime("chatGPT分析报告%Y-%m-%d-%H-%M-%S", time.localtime()) + '.md'
|
||||
file_name = 'GPT-Report-' + gen_time_str() + '.md'
|
||||
os.makedirs('./gpt_log/', exist_ok=True)
|
||||
with open(f'./gpt_log/{file_name}', 'w', encoding='utf8') as f:
|
||||
f.write('# GPT-Academic Report\n')
|
||||
for i, content in enumerate(history):
|
||||
try:
|
||||
if type(content) != str: content = str(content)
|
||||
except:
|
||||
continue
|
||||
if i % 2 == 0:
|
||||
f.write('## ')
|
||||
try:
|
||||
f.write(content)
|
||||
except:
|
||||
# remove everything that cannot be handled by utf8
|
||||
f.write(content.encode('utf-8', 'ignore').decode())
|
||||
f.write('\n\n')
|
||||
res = '以上材料已经被写入:\t' + os.path.abspath(f'./gpt_log/{file_name}')
|
||||
print(res)
|
||||
return res
|
||||
|
||||
|
||||
def write_history_to_file(history, file_basename=None, file_fullname=None):
|
||||
"""
|
||||
将对话记录history以Markdown格式写入文件中。如果没有指定文件名,则使用当前时间生成文件名。
|
||||
@@ -241,9 +224,9 @@ def write_history_to_file(history, file_basename=None, file_fullname=None):
|
||||
import time
|
||||
if file_fullname is None:
|
||||
if file_basename is not None:
|
||||
file_fullname = os.path.join(get_log_folder(), file_basename)
|
||||
file_fullname = pj(get_log_folder(), file_basename)
|
||||
else:
|
||||
file_fullname = os.path.join(get_log_folder(), f'GPT-Academic-{gen_time_str()}.md')
|
||||
file_fullname = pj(get_log_folder(), f'GPT-Academic-{gen_time_str()}.md')
|
||||
os.makedirs(os.path.dirname(file_fullname), exist_ok=True)
|
||||
with open(file_fullname, 'w', encoding='utf8') as f:
|
||||
f.write('# GPT-Academic Report\n')
|
||||
@@ -519,7 +502,7 @@ def find_recent_files(directory):
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
for filename in os.listdir(directory):
|
||||
file_path = os.path.join(directory, filename)
|
||||
file_path = pj(directory, filename)
|
||||
if file_path.endswith('.log'):
|
||||
continue
|
||||
created_time = os.path.getmtime(file_path)
|
||||
@@ -534,7 +517,7 @@ def promote_file_to_downloadzone(file, rename_file=None, chatbot=None):
|
||||
# 将文件复制一份到下载区
|
||||
import shutil
|
||||
if rename_file is None: rename_file = f'{gen_time_str()}-{os.path.basename(file)}'
|
||||
new_path = os.path.join(get_log_folder(), rename_file)
|
||||
new_path = pj(get_log_folder(), rename_file)
|
||||
# 如果已经存在,先删除
|
||||
if os.path.exists(new_path) and not os.path.samefile(new_path, file): os.remove(new_path)
|
||||
# 把文件复制过去
|
||||
@@ -549,44 +532,70 @@ def disable_auto_promotion(chatbot):
|
||||
chatbot._cookies.update({'files_to_promote': []})
|
||||
return
|
||||
|
||||
def on_file_uploaded(files, chatbot, txt, txt2, checkboxes, cookies):
|
||||
def is_the_upload_folder(string):
|
||||
PATH_PRIVATE_UPLOAD, = get_conf('PATH_PRIVATE_UPLOAD')
|
||||
pattern = r'^PATH_PRIVATE_UPLOAD/[A-Za-z0-9_-]+/\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}$'
|
||||
pattern = pattern.replace('PATH_PRIVATE_UPLOAD', PATH_PRIVATE_UPLOAD)
|
||||
if re.match(pattern, string): return True
|
||||
else: return False
|
||||
|
||||
def del_outdated_uploads(outdate_time_seconds):
|
||||
PATH_PRIVATE_UPLOAD, = get_conf('PATH_PRIVATE_UPLOAD')
|
||||
current_time = time.time()
|
||||
one_hour_ago = current_time - outdate_time_seconds
|
||||
# Get a list of all subdirectories in the PATH_PRIVATE_UPLOAD folder
|
||||
# Remove subdirectories that are older than one hour
|
||||
for subdirectory in glob.glob(f'{PATH_PRIVATE_UPLOAD}/*/*'):
|
||||
subdirectory_time = os.path.getmtime(subdirectory)
|
||||
if subdirectory_time < one_hour_ago:
|
||||
try: shutil.rmtree(subdirectory)
|
||||
except: pass
|
||||
return
|
||||
|
||||
def on_file_uploaded(request: gradio.Request, files, chatbot, txt, txt2, checkboxes, cookies):
|
||||
"""
|
||||
当文件被上传时的回调函数
|
||||
"""
|
||||
if len(files) == 0:
|
||||
return chatbot, txt
|
||||
import shutil
|
||||
import os
|
||||
import time
|
||||
import glob
|
||||
from toolbox import extract_archive
|
||||
try:
|
||||
shutil.rmtree('./private_upload/')
|
||||
except:
|
||||
pass
|
||||
|
||||
# 移除过时的旧文件从而节省空间&保护隐私
|
||||
outdate_time_seconds = 60
|
||||
del_outdated_uploads(outdate_time_seconds)
|
||||
|
||||
# 创建工作路径
|
||||
user_name = "default" if not request.username else request.username
|
||||
time_tag = gen_time_str()
|
||||
os.makedirs(f'private_upload/{time_tag}', exist_ok=True)
|
||||
err_msg = ''
|
||||
PATH_PRIVATE_UPLOAD, = get_conf('PATH_PRIVATE_UPLOAD')
|
||||
target_path_base = pj(PATH_PRIVATE_UPLOAD, user_name, time_tag)
|
||||
os.makedirs(target_path_base, exist_ok=True)
|
||||
|
||||
# 逐个文件转移到目标路径
|
||||
upload_msg = ''
|
||||
for file in files:
|
||||
file_origin_name = os.path.basename(file.orig_name)
|
||||
shutil.copy(file.name, f'private_upload/{time_tag}/{file_origin_name}')
|
||||
err_msg += extract_archive(f'private_upload/{time_tag}/{file_origin_name}',
|
||||
dest_dir=f'private_upload/{time_tag}/{file_origin_name}.extract')
|
||||
moved_files = [fp for fp in glob.glob('private_upload/**/*', recursive=True)]
|
||||
if "底部输入区" in checkboxes:
|
||||
txt = ""
|
||||
txt2 = f'private_upload/{time_tag}'
|
||||
this_file_path = pj(target_path_base, file_origin_name)
|
||||
shutil.move(file.name, this_file_path)
|
||||
upload_msg += extract_archive(file_path=this_file_path, dest_dir=this_file_path+'.extract')
|
||||
|
||||
# 整理文件集合
|
||||
moved_files = [fp for fp in glob.glob(f'{target_path_base}/**/*', recursive=True)]
|
||||
if "底部输入区" in checkboxes:
|
||||
txt, txt2 = "", target_path_base
|
||||
else:
|
||||
txt = f'private_upload/{time_tag}'
|
||||
txt2 = ""
|
||||
txt, txt2 = target_path_base, ""
|
||||
|
||||
# 输出消息
|
||||
moved_files_str = '\t\n\n'.join(moved_files)
|
||||
chatbot.append(['我上传了文件,请查收',
|
||||
chatbot.append(['我上传了文件,请查收',
|
||||
f'[Local Message] 收到以下文件: \n\n{moved_files_str}' +
|
||||
f'\n\n调用路径参数已自动修正到: \n\n{txt}' +
|
||||
f'\n\n现在您点击任意函数插件时,以上文件将被作为输入参数'+err_msg])
|
||||
f'\n\n现在您点击任意函数插件时,以上文件将被作为输入参数'+upload_msg])
|
||||
|
||||
# 记录近期文件
|
||||
cookies.update({
|
||||
'most_recent_uploaded': {
|
||||
'path': f'private_upload/{time_tag}',
|
||||
'path': target_path_base,
|
||||
'time': time.time(),
|
||||
'time_str': time_tag
|
||||
}})
|
||||
@@ -595,11 +604,12 @@ def on_file_uploaded(files, chatbot, txt, txt2, checkboxes, cookies):
|
||||
|
||||
def on_report_generated(cookies, files, chatbot):
|
||||
from toolbox import find_recent_files
|
||||
PATH_LOGGING, = get_conf('PATH_LOGGING')
|
||||
if 'files_to_promote' in cookies:
|
||||
report_files = cookies['files_to_promote']
|
||||
cookies.pop('files_to_promote')
|
||||
else:
|
||||
report_files = find_recent_files('gpt_log')
|
||||
report_files = find_recent_files(PATH_LOGGING)
|
||||
if len(report_files) == 0:
|
||||
return cookies, None, chatbot
|
||||
# files.extend(report_files)
|
||||
@@ -909,34 +919,35 @@ def zip_folder(source_folder, dest_folder, zip_name):
|
||||
return
|
||||
|
||||
# Create the name for the zip file
|
||||
zip_file = os.path.join(dest_folder, zip_name)
|
||||
zip_file = pj(dest_folder, zip_name)
|
||||
|
||||
# Create a ZipFile object
|
||||
with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
# Walk through the source folder and add files to the zip file
|
||||
for foldername, subfolders, filenames in os.walk(source_folder):
|
||||
for filename in filenames:
|
||||
filepath = os.path.join(foldername, filename)
|
||||
filepath = pj(foldername, filename)
|
||||
zipf.write(filepath, arcname=os.path.relpath(filepath, source_folder))
|
||||
|
||||
# Move the zip file to the destination folder (if it wasn't already there)
|
||||
if os.path.dirname(zip_file) != dest_folder:
|
||||
os.rename(zip_file, os.path.join(dest_folder, os.path.basename(zip_file)))
|
||||
zip_file = os.path.join(dest_folder, os.path.basename(zip_file))
|
||||
os.rename(zip_file, pj(dest_folder, os.path.basename(zip_file)))
|
||||
zip_file = pj(dest_folder, os.path.basename(zip_file))
|
||||
|
||||
print(f"Zip file created at {zip_file}")
|
||||
|
||||
def zip_result(folder):
|
||||
t = gen_time_str()
|
||||
zip_folder(folder, './gpt_log/', f'{t}-result.zip')
|
||||
return pj('./gpt_log/', f'{t}-result.zip')
|
||||
zip_folder(folder, get_log_folder(), f'{t}-result.zip')
|
||||
return pj(get_log_folder(), f'{t}-result.zip')
|
||||
|
||||
def gen_time_str():
|
||||
import time
|
||||
return time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
|
||||
|
||||
def get_log_folder(user='default', plugin_name='shared'):
|
||||
_dir = os.path.join(os.path.dirname(__file__), 'gpt_log', user, plugin_name)
|
||||
PATH_LOGGING, = get_conf('PATH_LOGGING')
|
||||
_dir = pj(PATH_LOGGING, user, plugin_name)
|
||||
if not os.path.exists(_dir): os.makedirs(_dir)
|
||||
return _dir
|
||||
|
||||
|
||||
在新工单中引用
屏蔽一个用户