mirror of https://github.com/flucont/btcloud.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
374 lines
12 KiB
374 lines
12 KiB
#coding: utf-8
|
|
# +-------------------------------------------------------------------
|
|
# | 宝塔Linux面板
|
|
# +-------------------------------------------------------------------
|
|
# | Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved.
|
|
# +-------------------------------------------------------------------
|
|
# | Author: hwliang <hwl@bt.cn>
|
|
# +-------------------------------------------------------------------
|
|
|
|
#+--------------------------------------------------------------------
|
|
#| 插件和模块加载器
|
|
#+--------------------------------------------------------------------
|
|
|
|
import public,os,sys,json
|
|
|
|
def plugin_run(plugin_name,def_name,args):
|
|
'''
|
|
@name 执行插件方法
|
|
@param plugin_name<string> 插件名称
|
|
@param def_name<string> 方法名称
|
|
@param args<dict_obj> 参数对像
|
|
@return mixed
|
|
'''
|
|
if not plugin_name or not def_name: return public.returnMsg(False,'插件名称和插件方法名称不能为空!')
|
|
|
|
# 获取插件目录
|
|
plugin_path = public.get_plugin_path(plugin_name)
|
|
is_php = os.path.exists(os.path.join(plugin_path,'index.php'))
|
|
|
|
# 检查插件目录是否合法
|
|
if is_php:
|
|
plugin_file = os.path.join(plugin_path,'index.php')
|
|
else:
|
|
plugin_file = os.path.join(plugin_path, plugin_name + '_main.py')
|
|
if not public.path_safe_check(plugin_file): return public.returnMsg(False,'插件路径不合法')
|
|
|
|
# 检查插件入口文件是否存在
|
|
if not os.path.exists(plugin_file): return public.returnMsg(False,'指定插件入口文件不存在')
|
|
|
|
# 添加插件目录到系统路径
|
|
public.sys_path_append(plugin_path)
|
|
|
|
try:
|
|
if not is_php:
|
|
# 引用插件入口文件
|
|
_name = "{}_main".format(plugin_name)
|
|
plugin_main = __import__(_name)
|
|
|
|
# 检查类名是否符合规范
|
|
if not hasattr(plugin_main,_name):
|
|
return public.returnMsg(False,'指定插件入口文件不符合规范')
|
|
|
|
try:
|
|
if sys.version_info[0] == 2:
|
|
reload(plugin_main)
|
|
else:
|
|
from imp import reload
|
|
reload(plugin_main)
|
|
except:
|
|
pass
|
|
|
|
# 实例化插件类
|
|
plugin_obj = getattr(plugin_main,_name)()
|
|
|
|
# 检查方法是否存在
|
|
if not hasattr(plugin_obj,def_name):
|
|
return public.returnMsg(False,'在[%s]插件中找不到[%s]方法' % (plugin_name,def_name))
|
|
|
|
if 'plugin_get_object' in args and args.plugin_get_object == 1:
|
|
return getattr(plugin_obj, def_name)
|
|
|
|
# 执行方法
|
|
return getattr(plugin_obj,def_name)(args)
|
|
else:
|
|
if 'plugin_get_object' in args and args.plugin_get_object == 1:
|
|
return None
|
|
import panelPHP
|
|
args.s = def_name
|
|
args.name = plugin_name
|
|
return panelPHP.panelPHP(plugin_name).exec_php_script(args)
|
|
|
|
except SyntaxError as ex:
|
|
return public.returnMsg(False,'指定插件不兼容当前操作系统')
|
|
except Exception as ex:
|
|
public.print_error()
|
|
return public.returnMsg(False,'指定插件不存在')
|
|
|
|
|
|
def get_module_list():
|
|
'''
|
|
@name 获取模块列表
|
|
@return list
|
|
'''
|
|
module_list = []
|
|
class_path = public.get_class_path()
|
|
for name in os.listdir(class_path):
|
|
path = os.path.join(class_path,name)
|
|
# 过滤无效文件
|
|
if not name or name.endswith('.py') or name[0] == '.' or not name.endswith('Model') or os.path.isfile(path):continue
|
|
module_list.append(name)
|
|
return module_list
|
|
|
|
def module_run(module_name,def_name,args):
|
|
'''
|
|
@name 执行模块方法
|
|
@param module_name<string> 模块名称
|
|
@param def_name<string> 方法名称
|
|
@param args<dict_obj> 参数对像
|
|
@return mixed
|
|
'''
|
|
if not module_name or not def_name: return public.returnMsg(False,'模块名称和模块方法名称不能为空!')
|
|
model_index = args.get('model_index',None)
|
|
class_path = public.get_class_path()
|
|
panel_path = public.get_panel_path()
|
|
|
|
module_file = None
|
|
if model_index:
|
|
# 新模块目录
|
|
if model_index in ['mod']:
|
|
_name = "{}Mod".format(module_name)
|
|
module_file = os.path.join(panel_path,'mod','project',module_name + 'Mod.py')
|
|
elif model_index:
|
|
# 旧模块目录
|
|
_name = "{}Model".format(module_name)
|
|
module_file = os.path.join(class_path,model_index+"Model",module_name + 'Model.py')
|
|
else:
|
|
_name = "{}Model".format(module_name)
|
|
module_file = os.path.join(class_path,"projectModel",module_name + 'Model.py')
|
|
else:
|
|
# 如果没指定模块名称,则遍历所有模块目录
|
|
module_list = get_module_list()
|
|
for name in module_list:
|
|
module_file = os.path.join(class_path,name,module_name + 'Model.py')
|
|
if os.path.exists(module_file):
|
|
_name = "{}Model".format(module_name)
|
|
break
|
|
|
|
# 判断模块入口文件是否存在
|
|
if not os.path.exists(module_file):
|
|
return public.returnMsg(False,'模块[%s]不存在' % module_name)
|
|
|
|
# 判断模块路径是否合法
|
|
if not public.path_safe_check(module_file):
|
|
return public.returnMsg(False,'模块路径不合法')
|
|
|
|
public.sys_path_append(os.path.dirname(module_file))
|
|
try:
|
|
# 引用模块入口文件
|
|
module_main = __import__(_name)
|
|
|
|
# 检查模块是否符合规范
|
|
if not hasattr(module_main,'main'):
|
|
return public.returnMsg(False,'指定模块入口文件不符合规范')
|
|
|
|
# 实例化模块类
|
|
module_obj = getattr(module_main,'main')()
|
|
|
|
# 检查方法是否存在
|
|
if not hasattr(module_obj,def_name):
|
|
return public.returnMsg(False,'在[%s]模块中找不到[%s]方法' % (module_name,def_name))
|
|
|
|
if 'module_get_object' in args and args.module_get_object == 1:
|
|
return getattr(module_obj,def_name)
|
|
|
|
# 执行方法
|
|
return getattr(module_obj,def_name)(args)
|
|
except SyntaxError as ex:
|
|
return public.returnMsg(False,'指定模块不兼容当前操作系统')
|
|
except Exception as ex:
|
|
public.print_error()
|
|
return public.returnMsg(False,'指定模块不存在')
|
|
|
|
|
|
def get_plugin_list(upgrade_force = False):
|
|
'''
|
|
@name 获取插件列表
|
|
@param upgrade_force<bool> 是否强制重新获取列表
|
|
@return dict
|
|
'''
|
|
|
|
api_root_url = 'https://api.bt.cn'
|
|
api_url = api_root_url+ '/panel/get_plugin_list'
|
|
panel_path = public.get_panel_path()
|
|
data_path = os.path.join(panel_path,'data')
|
|
|
|
if not os.path.exists(data_path):
|
|
os.makedirs(data_path,384)
|
|
|
|
plugin_list = {}
|
|
plugin_list_file = os.path.join(data_path,'plugin_list.json')
|
|
if os.path.exists(plugin_list_file) and not upgrade_force:
|
|
plugin_list_body = public.readFile(plugin_list_file)
|
|
try:
|
|
plugin_list = json.loads(plugin_list_body)
|
|
except:
|
|
plugin_list = {}
|
|
|
|
if not os.path.exists(plugin_list_file) or upgrade_force or not plugin_list:
|
|
try:
|
|
res = public.HttpGet(api_url)
|
|
except Exception as ex:
|
|
raise public.error_conn_cloud(str(ex))
|
|
if not res: raise Exception(False,'云端插件列表获取失败')
|
|
|
|
plugin_list = json.loads(res)
|
|
if type(plugin_list)!=dict or 'list' not in plugin_list:
|
|
if type(plugin_list)==str:
|
|
raise Exception(plugin_list)
|
|
else:
|
|
raise Exception('云端插件列表获取失败')
|
|
public.writeFile(plugin_list_file,json.dumps(plugin_list))
|
|
|
|
return plugin_list
|
|
|
|
|
|
def start_total():
|
|
'''
|
|
@name 启动统计服务
|
|
@return dict
|
|
'''
|
|
pass
|
|
|
|
def get_soft_list(args):
|
|
'''
|
|
@name 获取软件列表
|
|
@param args<dict_obj> 参数对像
|
|
@return dict
|
|
'''
|
|
pass
|
|
|
|
def db_encrypt(data):
|
|
'''
|
|
@name 数据库加密
|
|
@param args<dict_obj> 参数对像
|
|
@return dict
|
|
'''
|
|
try:
|
|
key = __get_db_sgin()
|
|
iv = __get_db_iv()
|
|
str_arr = data.split('\n')
|
|
res_str = ''
|
|
for data in str_arr:
|
|
if not data: continue
|
|
res_str += __aes_encrypt(data, key, iv)
|
|
except:
|
|
res_str = data
|
|
result = {
|
|
'status' : True,
|
|
'msg' : res_str
|
|
}
|
|
return result
|
|
|
|
def db_decrypt(data):
|
|
'''
|
|
@name 数据库解密
|
|
@param args<dict_obj> 参数对像
|
|
@return dict
|
|
'''
|
|
try:
|
|
key = __get_db_sgin()
|
|
iv = __get_db_iv()
|
|
str_arr = data.split('\n')
|
|
res_str = ''
|
|
for data in str_arr:
|
|
if not data: continue
|
|
res_str += __aes_decrypt(data, key, iv)
|
|
except:
|
|
res_str = data
|
|
result = {
|
|
'status' : True,
|
|
'msg' : res_str
|
|
}
|
|
return result
|
|
|
|
def __get_db_sgin():
|
|
keystr = '3gP7+k_7lSNg3$+Fj!PKW+6$KYgHtw#R'
|
|
key = ''
|
|
for i in range(31):
|
|
if i & 1 == 0:
|
|
key += keystr[i]
|
|
return key
|
|
|
|
def __get_db_iv():
|
|
div_file = "{}/data/div.pl".format(public.get_panel_path())
|
|
if not os.path.exists(div_file):
|
|
str = public.GetRandomString(16)
|
|
str = __aes_encrypt_module(str)
|
|
div = public.get_div(str)
|
|
public.WriteFile(div_file, div)
|
|
if os.path.exists(div_file):
|
|
div = public.ReadFile(div_file)
|
|
div = __aes_decrypt_module(div)
|
|
else:
|
|
keystr = '4jHCpBOFzL4*piTn^-4IHBhj-OL!fGlB'
|
|
div = ''
|
|
for i in range(31):
|
|
if i & 1 == 0:
|
|
div += keystr[i]
|
|
return div
|
|
|
|
def __aes_encrypt_module(data):
|
|
key = 'Z2B87NEAS2BkxTrh'
|
|
iv = 'WwadH66EGWpeeTT6'
|
|
return __aes_encrypt(data, key, iv)
|
|
|
|
def __aes_decrypt_module(data):
|
|
key = 'Z2B87NEAS2BkxTrh'
|
|
iv = 'WwadH66EGWpeeTT6'
|
|
return __aes_decrypt(data, key, iv)
|
|
|
|
def __aes_decrypt(data, key, iv):
|
|
from Crypto.Cipher import AES
|
|
import base64
|
|
encodebytes = base64.decodebytes(data.encode('utf-8'))
|
|
aes = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv.encode('utf-8'))
|
|
de_text = aes.decrypt(encodebytes)
|
|
unpad = lambda s: s[0:-s[-1]]
|
|
de_text = unpad(de_text)
|
|
return de_text.decode('utf-8')
|
|
|
|
def __aes_encrypt(data, key, iv):
|
|
from Crypto.Cipher import AES
|
|
import base64
|
|
data = (lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16).encode('utf-8'))(data.encode('utf-8'))
|
|
aes = AES.new(key.encode('utf8'), AES.MODE_CBC, iv.encode('utf8'))
|
|
encryptedbytes = aes.encrypt(data)
|
|
en_text = base64.b64encode(encryptedbytes)
|
|
return en_text.decode('utf-8')
|
|
|
|
def plugin_end():
|
|
'''
|
|
@name 插件到期处理
|
|
@return dict
|
|
'''
|
|
pass
|
|
|
|
def daemon_task():
|
|
'''
|
|
@name 后台任务守护
|
|
@return dict
|
|
'''
|
|
pass
|
|
|
|
def daemon_panel():
|
|
'''
|
|
@name 面板守护
|
|
@return dict
|
|
'''
|
|
pass
|
|
|
|
def flush_auth_key():
|
|
'''
|
|
@name 刷新授权密钥
|
|
@return dict
|
|
'''
|
|
pass
|
|
|
|
def get_auth_state():
|
|
'''
|
|
@name 获取授权状态
|
|
@return 返回:0.免费版 1.专业版 2.企业版 -1.获取失败
|
|
'''
|
|
try:
|
|
softList = get_plugin_list()
|
|
if softList['ltd'] > -1:
|
|
return 2
|
|
elif softList['pro'] > -1:
|
|
return 1
|
|
else:
|
|
return 0
|
|
except:
|
|
return -1
|
|
|
|
|