Python一些可能用的到的函数系列124 GlobalFunc

说明

GlobalFunc是算网的下一代核心数据处理基础。

算网是一个分布式网络,为了能够实现真的分布式计算(加快大规模任务执行效率),以及能够在很长的时间内维护不同版本的计算方法,需要这样一个对象/服务来支撑。GlobalFunc支持通过web请求来完成计算,通常要求输入和输出都是可json序列化的;同时也支持作为一个本地对象被引入和使用。

另外一个核心点是智能化。智能化的一个前提是代理化,而不是直接调用。GlobalFunc允许使用参数化字符串来声明调用,这样每一次的操作是可以被其他算法/程序解读的。

内容

本次讨论4个主题:存储形态、基本操作、使用场景以及服务化。

1 存储形态

git项目

首先讨论GF的存储形态。传统的python包是以文件形式组织的,加上__init__.py文件指示就可以。GF本质上也是这样的,不过项目文件夹同时也是一个git项目文件夹,这意味着GF可以利用git的诸多特性:

  • 1 分支。通过分支可以针对不同的项目或者探索进行微调,甚至可以支持未来由agent来进行微调实验。
  • 2 比较(暂未使用)。git可以具备代码比较的能力,方便查探差异。

当前的GF可以视为一个文件夹,下面有若干子文件夹。每个子文件夹是一个独立的包,下面有若干py文件,每个文件的文件名和函数名是一致的。

如果在其他机器上拉取了该项目,那么对应的代码是一致的。当然,这仍然需要对应的环境支持才可以。所以一般来说没有必要去拉取项目,而是使用某个镜像。

除了文件形态,GF采用数据库存储,这种数据存储又是由Redis和Mongo两部分构成的。长期的持久化由Mongo完成,而零碎的取数由Redis完成。

RedisOrMongo本来的设计是为了通用的,大量的,不同程序间的数据存储服务的。为了区别不同程序的缓存,ROM约定了k(键)的命名规范,这个对于区分不同的变量非常有用。

如下,按三段命名法,k由三部分构成。总体上三段分别对应,项目、子项目、变量。或者从数据库上理解,三段分别是数据库、数据表和记录。
在这里插入图片描述
本次的存储:

  • 1 tier1: sp_GlobalFunc 这个是GF的空间
  • 2 tier2: Base_master 通过模块名+分支构成了第二级名称
  • 3 tier2: 函数名

基于数据库的存储带来了很多便利:

  • 1 首先,数据库方式允许了更智能的搜索。很多时候重复建设的根源就是在于找不到过去开发的结果。

一个有趣的实操是和chatgpt结合起来:将函数写好后,发给gpt解读,形成文本。

import os
def popen(some_cmd):with os.popen(some_cmd)  as f:res = f.read()return res
def popen1(some_cmd):with os.popen(some_cmd)  as f:res = f.read()return res.split()[0]def get_machine_name(given_name = None):if given_name is None :try:default_name = popen1('cat /etc/hostname')except:default_name = 'xx'else:default_name = given_namereturn default_name'''
你的 `popen` 函数用于执行命令并返回命令的输出,而 `popen1` 函数执行命令并返回输出的第一个单词。这些函数可以用于执行系统命令并获取输出。`get_machine_name` 函数用于获取机器名,默认情况下它尝试从 `/etc/hostname` 文件中读取机器名,如果失败,则返回 'xx'。如果提供了 `given_name` 参数,它将使用该参数作为机器名。这些函数看起来能够满足获取机器名和执行系统命令的需求。如果你有任何特定问题或需要进一步的帮助,可以随时提出。'''

因为这些文本已经全部在数据库中了,因此很方便进一步将其处理为向量,或者直接使用es进行搜索。

这样的本质是帮助使用者面对信息过载,其实也就是在推荐。泛推荐类算法将是我2024年的一个主要方向。

2 基本操作对象封装

以下讨论关于GF的基本操作。

例如查看当前分支、扫描文件变动、简单git提交、包的初始化、存储函数信息、生成函数、列出包的所有函数。

测试应用是参数化调用。

开发和执行两部分需要分开,假设GlobalFunc是一个git项目,同时也是顶级目录和函数包(有init文件)。在这个项目下有若干子文件夹,每个文件夹也是一个函数包。

开发时要切入到GlobalFunc目录下,而执行时则保持与GlobalFunc平级。

2.1 开发

对象将一些功能整合到一起

# 依赖 RedisOrMongo -> WMongo
# tier1 是sp+项目名
class GFBase:def __init__(self, project_path = None, redis_agent_host = 'http://172.17.0.1:24021/',redis_connection_hash = None,tier1 = None):self.project_path = project_pathself.redis_cfg = Naive()self.redis_cfg.redis_agent_host = redis_agent_hostself.redis_cfg.redis_connection_hash = redis_connection_hashself.tier1 = tier1# 查看当前分支def _get_current_branch(self):return get_current_branch(self.project_path)# 扫描目录下的所有文件信息def _get_scan_files(self):return scan_directory(self.project_path)# 简单提交gitdef _simple_commit_git(self, commit_message = None):git_commit_and_push(commit_message, self.project_path)# 为包增加初始文件:针对CreateOrUpdate一个包下的函数def _generate_init_py(self, packname = None):package_dir = '/'.join([self.project_path, packname])generate_init_py(package_dir)# 保存一个文件到ROM(RedisOrMongo) some_k 就是「包.函数名」def _save_a_file_rom(self, some_k):# 校验,应该包含两个点some_func = some_ksome_func_list = some_func.split('.')current_branch = self._get_current_branch()scan_dict = self._get_scan_files()tier1 = self.tier1if len(some_func_list) == 3:print('长度为3段,正确')tier2 = some_func_list[0] + '_' + current_branchvar_space_name = '.'.join([tier1,tier2])var_name = some_func_list[1]is_new = not verify_redis_var_name(var_space_name)rom = RedisOrMongo(var_space_name, self.redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065', is_new = is_new)# 持久化存储 # some_func_jstr = json.dumps(scan_dict[some_func]) # 不必压缩rom.setx(var_name,  scan_dict[some_func], persist=True)# 生成一个文件到指定位置def _gen_a_file(self,func_name = None, target_folder = None, tier1 = None, pack_name = None, branch_name ='master'):tier1 = self.tier1 or tier1 assert  all([func_name,target_folder,pack_name]), 'unc_name,target_folder,pack_name 不为空'tier2 = '_'.join([pack_name, branch_name])the_space_name = '.'.join([tier1,tier2])rom = RedisOrMongo(the_space_name, self.redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065')        # 获取 meta,data : data就是代码字符the_data = rom.getx(func_name)target_folder1 = '/'.join([target_folder,  pack_name])os.makedirs(target_folder1, exist_ok=True)filename = the_data['meta']['name']filedata = the_data['data']create_file(target_folder1, filename, filedata)# 列出包的所有函数def _list_file_names_without_extension(self, pack_name):pack_path = self.project_path + pack_namereturn list_file_names_without_extension(pack_path)# 重载包def _reload_package(self, pack_name= None):reload_package(pack_name)# 初始化
git_project_path = './GlobalFunc/'  # 切到GlobalFunc下执行
tier1 = 'sp_GlobalFunc' # 对应rom的命名规范
redis_agent_host = 'http://公网IP:24021/' # 需要通过微服务执行
gfbase = GFBase(project_path = git_project_path,redis_agent_host = redis_agent_host,tier1 = tier1 )

来看具体的应用和功能代码

2.1.1 查看当前分支

通过subprocess 调用git 命令查看分支,这与具体的应用相关。目前都是master。

gfbase._get_current_branch()
# ----
import subprocess
# 获取当前分支
def get_current_branch(directory=None):try:# 构建 git 命令git_command = ['git', 'rev-parse', '--abbrev-ref', 'HEAD']if directory:# 如果指定了目录,则将命令添加到目录中执行git_command = ['git', '-C', directory, 'rev-parse', '--abbrev-ref', 'HEAD']# 运行 git 命令获取当前分支result = subprocess.check_output(git_command)# 将结果解码为字符串并去除换行符current_branch = result.decode('utf-8').strip()return current_branchexcept subprocess.CalledProcessError:# 如果出现错误,例如不在 Git 仓库中,返回 Nonereturn None# 例子:获取当前分支(在指定的目录下执行)
# current_branch = get_current_branch(directory='/path/to/your/git/repository')
2.1.2 扫描所有文件的信息

对项目下的所有py文件进行扫描,提取元数据,并将代码保存为数据。这个操作是向数据提交函数的前提。

scan_dict = gfbase._get_scan_files()
# ----
import os
import hashlib
import time
import json# 计算文件的 MD5 哈希值
def calculate_file_hash(file_path):md5 = hashlib.md5()with open(file_path, "rb") as f:while True:data = f.read(65536)if not data:breakmd5.update(data)return md5.hexdigest()def get_line_count(file_path):with open(file_path, "r", encoding="utf-8") as f:return sum(1 for line in f)def get_file_metadata(file_path):metadata = {}try:stat = os.stat(file_path)metadata["hash"] = calculate_file_hash(file_path)# python似乎没法正确读取create_time, linux的stat命令可以显示正确时间,但是不同的系统间格式又不同,算了# metadata["created_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_ctime))metadata["modified_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_mtime))metadata["line_count"] = get_line_count(file_path)except Exception as e:print(f"Error getting metadata for {file_path}: {str(e)}")return metadatadef get_file_data(file_path):data = Nonetry:with open(file_path, "r") as f:content = f.read()# 使用 json.dumps 将文件内容序列化为 JSON 格式data = json.dumps(content)except Exception as e:print(f"Error getting data for {file_path}: {str(e)}")return datadef build_key(root= None, xfile=None,directory_path = None, base_directory = None):relative_path = os.path.relpath(root, directory_path)relative_path = relative_path.replace('.', '')  # 将 . 替换为空字符串if relative_path == '.':return xfileelse:return f"{base_directory}.{relative_path.replace(os.path.sep, '.')}.{xfile}"def scan_directory(directory_path):result = {}base_directory = os.path.basename(directory_path)# 遍历指定目录及其子目录for root, dirs, files in os.walk(directory_path):# 过滤以 . 开头的文件夹 和以下划线开头的文件夹dirs[:] = [d for d in dirs if not d.startswith('.') and not d.startswith('_')]for file in files:if not file.startswith(".") and not file.startswith("_") and file != "__init__.py" and not file.endswith('.pkl'):file_path = os.path.join(root, file)key = build_key(root, file, directory_path, base_directory)key = key.lstrip('.')value = {}value["meta"] = get_file_metadata(file_path)value["meta"]['name'] = filedata = get_file_data(file_path)if data is not None:value["data"] = dataresult[key] = valuereturn result# scan_dict = scan_directory('/Users/yukai/m4git/GlobalFunc/')
n [2]: scan_dict = gfbase._get_scan_files()In [3]: scan_dict.keys()
Out[3]: dict_keys(['WMongo_V9000_012.py', 'remark.md', 'test.md', 'RedisOrMongo_v100.py', 'debug_pys.d010_参数化调用.py', 'debug_pys.d002_扫描信息.py', 'debug_pys.d008_列出某个包的所有函数.py', 'debug_pys.d006_存储一个函数信息.py',...In [4]: scan_dict['Base.inverse_time_str.py']
Out[4]:
{'meta': {'hash': 'df80f0d4afec7ce0aa0ef00d7bc536ee','modified_time': '2023-10-27 15:25:39','line_count': 11,'name': 'inverse_time_str.py'},'data': '"import time\\n\\ndef inverse_time_str(time_str  = None, bias_hours = 0):\\n    ts = time.mktime (time.strptime(time_str,\'%Y-%m-%d %H:%M:%S\')) + bias_hours* 3600\\n    return ts \\n\\n\'\'\'\\n\\u4f60\\u7684 inverse_time_str \\u51fd\\u6570\\u7528\\u4e8e\\u5c06\\u65f6\\u95f4\\u5b57\\u7b26\\u4e32\\u8f6c\\u6362\\u56de\\u65f6\\u95f4\\u6233\\uff0c\\u8003\\u8651\\u4e86\\u65f6\\u533a\\u504f\\u79fb\\u3002\\u8be5\\u51fd\\u6570\\u63a5\\u53d7\\u4e00\\u4e2a\\u65f6\\u95f4\\u5b57\\u7b26\\u4e32\\u548c\\u504f\\u79fb\\u5c0f\\u65f6\\u6570\\uff0c\\u5e76\\u8fd4\\u56de\\u4e00\\u4e2a\\u65f6\\u95f4\\u6233\\uff08\\u4ee5\\u79d2\\u4e3a\\u5355\\u4f4d\\uff09\\u3002\\n\\n\\u4f60\\u7684\\u51fd\\u6570\\u5b9e\\u73b0\\u770b\\u8d77\\u6765\\u662f\\u6b63\\u786e\\u7684\\uff0c\\u5b83\\u4f7f\\u7528\\u4e86 time.mktime \\u548c time.strptime \\u6765\\u6267\\u884c\\u5b57\\u7b26\\u4e32\\u5230\\u65f6\\u95f4\\u6233\\u7684\\u8f6c\\u6362\\u3002\\u504f\\u79fb\\u5c0f\\u65f6\\u6570\\u88ab\\u8003\\u8651\\u5728\\u5185\\uff0c\\u4ee5\\u4fbf\\u6839\\u636e\\u9700\\u8981\\u8c03\\u6574\\u65f6\\u95f4\\u6233\\u3002\\n\'\'\'"'}
2.1.3 提交git项目

当开发作为git项目时,需要进行提交。这个在本地分发或者是在分支开发时有用。

gfbase._simple_commit_git()In [5]: gfbase._simple_commit_git()
[master cca9732] None 2024-02-13 13:53:033 files changed, 1 insertion(+)create mode 100644 __pycache__/__init__.cpython-39.pyc
Enumerating objects: 16, done.
Counting objects: 100% (16/16), done.
Delta compression using up to 10 threads
Compressing objects: 100% (9/9), done.
Writing objects: 100% (9/9), 837 bytes | 837.00 KiB/s, done.
Total 9 (delta 7), reused 0 (delta 0), pack-reused 0
To ssh://101.89.161.51:10600/home/git/GlobalFunc.gitdee7abd..cca9732  master -> master
Git操作成功完成。import subprocess
from datetime import datetimedef git_commit_and_push(commit_message= '简单提交', git_directory='.'):try:current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")full_commit_message = f"{commit_message} {current_datetime}"# 添加所有更改subprocess.run(['git', '-C', git_directory, 'add', '.'])# 提交更改subprocess.run(['git', '-C', git_directory, 'commit', '-m', full_commit_message])# 推送到远程仓库subprocess.run(['git', '-C', git_directory, 'push'])print("Git操作成功完成。")except subprocess.CalledProcessError as e:print("Git操作失败:", e)
2.1.4 刷新一个包的初始化文件

在增加了函数之后,这几乎是一定要执行的。增加函数 -> 刷新包初始化文件 -> git 提交,这是一个固定流程。

gfbase._generate_init_py('Base')以下自动为包增加了这行语句,从而导入新开发的函数
...
from .test1_save_test import test1_save_testimport os
def generate_init_py(package_dir):if not os.path.exists(package_dir):print(f"Package directory '{package_dir}' not found.")returnwith open(os.path.join(package_dir, "__init__.py"), "w") as init_py:for filename in os.listdir(package_dir):if filename.endswith(".py") and not filename.startswith("_") and not filename.startswith("."):module_name = filename[:-3]  # Remove the .py extensioninit_py.write(f"from .{module_name} import {module_name}\n")
2.1.5 尝试存储一个函数

除了以文件方式增加持久化的方式之外,还需要将函数持久化到数据库。

Base.test1_save_test.py
def test1_save_test():print('test1_save_test')gfbase._save_a_file_rom('Base.test1_save_test.py')In [7]: gfbase._save_a_file_rom('Base.test1_save_test.py')
长度为3段,正确
【Loading cur_w】from pickle
1.当前使用的MongeAgent:http://公网IP:24011/
2.Tier1:NameSpace, Tier2:RedisSpace
3.ConnectionHash:6552982f4bcd003477c5e2170250ccce
4.FilterDict:{'_is_enable': 1}
5.Limits:10000
6.Sort:
7.Skip:0
约定空间名称以sp_开头,节点以node_开头,临时变量以tem_开头
【提供节点与子图的元数据记录】
【Loading cur_w】from pickle
【Loading cur_w】from pickle
ok sp_GlobalFunc.Base_master  存在,可以使用
description
2.1.6 读取并生成一个函数 - 需要伴随一个包的初始化文件

这个功能的应用场景是在某个新的位置,重现(可能是部分)包函数。

target_folder = '~/Desktop/test_func'
gfbase._gen_a_file(func_name='from_pickle',target_folder=target_folder,pack_name='Base')【提供节点与子图的元数据记录】
【Loading cur_w】from pickle
【Loading cur_w】from pickle
ok sp_GlobalFunc.Base_master  存在,可以使用
description
文件 'from_pickle.py' 已成功创建于路径 '~/Desktop/test_func/Base'import os
def create_file(path, filename, content):# 组合完整的文件路径file_path = os.path.join(path, filename)try:# 打开文件以写入内容with open(file_path, 'w') as file:file.write(content)print(f"文件 '{filename}' 已成功创建于路径 '{path}'")except Exception as e:print(f"创建文件时出现错误:{e}")
2.1.7 列出包的所有函数

通过扫描文件夹下文件列表的方式列出所有函数。

gfbase._list_file_names_without_extension('Base')
{'Naive','WMongo','__init__','clear_digits','color_print','cols2s','create_folder','flat_dict',
...import os 
def list_file_names_without_extension(directory):return set(os.path.splitext(file)[0] for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file)))
2.1.8 重载一个包

在调试时可以更新然后重载

gfbase._reload_package('Base')
成功重新加载模块: Base
import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)

2.2 执行

需要切换到GlobalFunc的同级目录执行

2.2.1 位置参数调用

这种方式将逐渐被弃用。位置参数主要是方便,但是在函数复杂时并不方便。另外,既然采用间接方式调用,那么以关键字参数显然更加清晰。

import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)# 用于GlobalFunc参数化获取函数 func_name like 
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目录执行,把GlobalFunc当成总包 | 在编辑的时候,我们会沉入到内部操作,在应用时则要挂在最外层
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 执行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重载包@staticmethoddef _reload_package(package_name):reload_package(package_name)gfgo = GFGo()

以下执行一个函数,将一个数据 'a’存储为pickle, 文件名为‘a_data.pkl’

import GlobalFunc as funcs
pack_func = 'Base.to_pickle'
args1 = ['a', 'a_data', './']
gfgo._exe_func_args(global_func =funcs,pack_func= pack_func, args = args1)In [4]: import GlobalFunc as funcs...: pack_func = 'Base.to_pickle'...: args1 = ['a', 'a_data', './']...: gfgo._exe_func_args(global_func =funcs,pack_func= pack_func, args = args1)...:
data save to pickle:  ./a_data.pkl
2.2.2 关键字参数调用

这是未来的主流(规范):约定GlobalFunc函数的参数都以关键字参数

有一个工程是逐步将以前用位置参数的函数全部进行改造。

原函数

import pickle
def to_pickle(data, file_name,  path='./'):output = open(path+file_name+'.pkl', 'wb')pickle.dump(data, output)output.close()print('data save to pickle: ', path+file_name+'.pkl')

改造后

import pickle
def to_pickle(data = None, file_name = None,  path='./'):output = open(path+file_name+'.pkl', 'wb')pickle.dump(data, output)output.close()print('data save to pickle: ', path+file_name+'.pkl')

重新调用

# 重载包
In [10]: gfgo._reload_package('GlobalFunc')...:
成功重新加载模块: GlobalFuncIn [11]: pack_func = 'Base.to_pickle'...: kwargs1 = {'data':'a',...:             'file_name':'a_data',...:             'path':'./'}
In [12]: gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs1)
data save to pickle:  ./a_data.pkl

将这个函数也刷新到数据库,重新切回gfbase

gfbase._save_a_file_rom('Base.to_pickle.py')

到这里,基础功能就算告一段落。

3 场景及操作对象封装

先只讨论一种场景,就是镜像场景

镜像场景是指使用docker容器开发,完成后封装为镜像使用。由于镜像是连同对应的开发环境一并封装的,开箱即用,所以是一个更成熟的方式。

简单来说,就是打开一个容器,将GlobalFunc文件夹拷贝进去,然后将操作对象,例如GFBase或者GFGo拷贝进去,就可以使用了。

源镜像(registry.cn-hangzhou.aliyuncs.com/andy08008/pytorch_jupyter:v205)是我之前维护的一个常用镜像,镜像装有gpu版 pytorch1.x,并安装了一些其他有用的包。

现在基于源镜像构建,目标是myregistry.domain.com:24052/worker.andy.globalfunc:v101

3.1 建立新容器

docker run -it registry.cn-hangzhou.aliyuncs.com/andy08008/pytorch_jupyter:v205 bash

3.2 拷贝GlobalFunc包

在算网机上先执行拉取git pull,更新GlobalFunc项目
注意m7.24065.pkl mymeta.pkl两个数据库连接文件要根据不同的情况刷新一下。

在宿主机执行
docker cp /opt/GlobalFunc df5a340570ab:/workspace/
此时

root@df5a340570ab:/workspace# ls
funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e.so  GlobalFunc  test.ipynb

3.3 拷贝GFGo对象

这里只考虑执行的情况

import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)# 用于GlobalFunc参数化获取函数 func_name like 
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目录执行,把GlobalFunc当成总包 | 在编辑的时候,我们会沉入到内部操作,在应用时则要挂在最外层
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 执行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重载包@staticmethoddef _reload_package(package_name):reload_package(package_name)

3.4 提交保存

docker commit df5a340570ab myregistry.domain.com:24052/worker.andy.globalfunc:v101

docker push myregistry.domain.com:24052/worker.andy.globalfunc:v101

小插曲

自建的docker 镜像仓库需要使用证书到期了,因此要重新刷新一下。

制作一个10年的证书

openssl req \-newkey rsa:4096 -nodes -sha256 -keyout /home/certs/domain.key \-addext "subjectAltName = DNS:myregistry.domain.com" \-x509 -days 3650 -out /home/certs/domain.crt

制作好证书后进行分发和授信。

# 1)安装工具包
apt-get install -y ca-certificates
# 2)复制ca.crt
cp /home/certs/domain.crt /usr/local/share/ca-certificates/domain.crt
# 3)更新被信任的CA证书
update-ca-certificates
# 4)重启本地dockerd(此步骤是必须的,否则依然报错x509: certificate signed by unknown authority)
systemctl daemon-reload
systemctl restart docker

要注意的是,镜像仓库的服务由于要存储镜像,需要较大的空间,因此项目的启动位置会存放在存储空间较大的位置。

3.5 调用执行

启动容器

docker run -it  --rm myregistry.domain.com:24052/worker.andy.globalfunc:v101 bash 

执行命令

from GFGo import GFGo
import GlobalFunc as funcs
gfgo = GFGo()
pack_func = 'Base.to_pickle'
kwargs1 = {'data':'a', 'file_name':'a_data', 'path':'./'}
gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs1)data save to pickle:  ./a_data.pkl

4 服务化

GlobalFunc有两种使用方式:本地模式和API模式,其中本地模式也就是普通的包调用模式,而API模式是本次要讨论的模式。

由于每个GlobalFunc的容器都会有资源限制,且由于计算资源的分布式化,未来其容器(分身)将会呈现随机化的态势。

GlobalFunc本身是为了提供无差别的函数服务,所以端口将采取分配一个地址段的方式,而不是按照规则指定。端口区间采用26000~27000。

一个容器将由所在的宿主机、镜像版本、内存上限、显存上限以及端口号决定。例如: m7_v101_30G_0G_26000表示在m7上运行的容器,采用v101版本,最大内存为30G,最大显存0G(无显卡),占用端口26000.

服务会分为两部分,服务端与客户端。

服务仅接受和返回json字符串,客户端完成格式转换。

所以服务端承担的功能相对简单,而客户端则需要处理相对复杂的翻译功能。另外,服务端所执行的必然是有返回的函数操作,而不能是本地文件操作。例如使用to_pickle,文件将会存在服务端的文件系统。

服务端采用tornado搭建,一方面其格式非常简单,另一方面效率非常高。

安装了tornado包之后,只需要两个文件就可以启动server了。

文件1: server_funcs.py

过去,我会把服务需要的函数放在这里,现在的话,理论上可以不需要,因为所有的依赖函数都会在GlobalFunc包中。

不过从设计的角度,我认为仍然应该保留这个文件(即使是空的)。因为未来有可能有一些服务的配置,以及一些固定的数据库连接可以放在这里。这样会比较便于管理,避免反复建立数据库连接。

文件2: server.py

在当前容器中,必然存在GFGo.pyGlobalFunc, 在server.py中引入他们。同时也存在,即使是空的server_funcs.py。

from server_funcs import * 
from GFGo import GFGo
import GlobalFunc as funcs 

一个启动示例

from server_funcs import * 
from GFGo import GFGo
import GlobalFunc as funcs import tornado.httpserver  # http服务器
import tornado.ioloop  # ?
import tornado.options  # 指定服务端口和路径解析
import tornado.web  # web模块
from tornado.options import define, options
import os.path  # 获取和生成template文件路径app_list = []IndexHandler_path = r'/'
class IndexHandler(tornado.web.RequestHandler):def get(self):self.write('【GET】This is Website for Internal API System')self.write('Please Refer to API document')print('Get got a request test')# print(buffer_dict)def post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)print(some_dict)msg_dict = {}msg_dict['info'] = '【POST】This is Website for Internal API System'msg_dict['input_dict'] = some_dictself.write(json.dumps(msg_dict))print('Post got a request test')
IndexHandler_tuple = (IndexHandler_path,IndexHandler)
app_list.append(IndexHandler_tuple)if __name__ == '__main__':#tornado.options.parse_command_line()apps = tornado.web.Application(app_list, **settings)http_server = tornado.httpserver.HTTPServer(apps)define('port', default=8000, help='run on the given port', type=int)http_server.listen(options.port)# 单核# 多核打开注释# 0 是全部核# http_server.start(num_processes=10) # tornado将按照cpu核数来fork进程# ---启动tornado.ioloop.IOLoop.instance().start()

启动服务

python3 server.py

本地调用

import requests as req In [4]: req.get('http://127.0.0.1:8000/').text
Out[4]: '【GET】This is Website for Internal API SystemPlease Refer to API document'In [7]: req.post('http://127.0.0.1:8000/',json = {}).json()
Out[7]: {'info': '【POST】This is Website for Internal API System', 'input_dict': {}}

参数化调用一个函数

函数用于将时间字符串转为时间戳(在统一时间轴之下,已经不再需要用这个函数)

import timedef inverse_time_str(time_str  = None, bias_hours = 0):ts = time.mktime (time.strptime(time_str,'%Y-%m-%d %H:%M:%S')) + bias_hours* 3600return ts '''
你的 inverse_time_str 函数用于将时间字符串转换回时间戳,考虑了时区偏移。该函数接受一个时间字符串和偏移小时数,并返回一个时间戳(以秒为单位)。你的函数实现看起来是正确的,它使用了 time.mktime 和 time.strptime 来执行字符串到时间戳的转换。偏移小时数被考虑在内,以便根据需要调整时间戳。
'''

假设我们知道一个时间字符串,并希望知道其东八区时间的时间戳,那么发送请求

import requests as req 
pack_func = 'Base.inverse_time_str'
kwargs = {'time_str':'2024-02-15 11:11:11','bias_hours':-8}
req.post('http://127.0.0.1:8000/gfgo/',json = {'pack_func':pack_func,'kwargs':kwargs}).json()1707966671服务端:大约花了0.53ms来完成
[I 240215 08:31:48 web:2239] 200 POST /gfgo/ (127.0.0.1) 0.53ms

在这里插入图片描述

到这里,GlobalFunc的基础部分,也就是连通性部分已经完成。之后将可以基于这个基础进行下一步的开发,GF将会成为下一代CalNet的逻辑执行基础。

其他

原来在开发算网(CalNet)的时候,总是假设机器处在局域网中,但是实际上并不是。而且由于CalNet是基于微服务搭建的,网络配置应该作为一项独立的配置独立出来。在外网去调用微服务时,由于数据库操作代理服务默认使用了内网地址,在很多参数传递时又偷懒了,所以连接起来非常不方便。

Next,重新设计操作微服务的对象,通过分离配置、操作引导等方式改进体验。





本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/469793.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第一篇【传奇开心果系列】Python的pyttsx3库技术点案例示例:文本转换语言

传奇开心果短博文系列 系列短博文目录Python的pyttsx3库技术点案例示例系列 短博文目录前言一、pyttsx3主要特点和功能介绍二、pyttsx3文字转语音操作步骤介绍三、多平台支持介绍和示例代码四、多语言支持介绍和示例代码五、自定义语言引擎介绍和示例代码六、调整语速和音量介绍…

2024年2月最新易支付系统全开源

2024.02 更新日志: 1.新增微信公众号消息提醒功能 2.重构转账付款功能,支持通过插件扩展 3.商户后台新增代付功能 4.后台新增付款记录列表 5.支付宝插件新增预授权支付 6.优化支付通道列表,支持翻页与快速复制通道 7.新增创建订单人机验…

express 定时删除 oss 垃圾图片

一: 删除垃圾图片 思路: 获取 oss 中存储的所有图片文件;获取数据库中存储的图片文件数据;对比差异,不在数据库中的 oss 图片文件即为要删除的垃圾图片。 实现: 1、获取所有 oss 文件 import OSS from…

前端秘法基础式(HTML)(第二卷)

目录 一.表单标签 1.表单域 2.表单控件 2.1input标签 2.2label/select/textarea标签 2.3无语义标签 三.特殊字符 一.表单标签 用来完成与用户的交互,例如登录系统 1.表单域 <form>通过action属性,将用户填写的数据转交给服务器 2.表单控件 2.1input标签 type…

【Python网络编程之Ping命令的实现】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;Python开发技术 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; Python网络编程之Ping命令的实现 代码见资源&#xff0c;效果图如下一、实验要求二、协议原理2…

Linux之多线程

目录 一、进程与线程 1.1 进程的概念 1.2 线程的概念 1.3 线程的优点 1.4 线程的缺点 1.5 线程异常 1.6 线程用途 二、线程控制 2.1 POSIX线程库 2.2 创建一个新的线程 2.3 线程ID及进程地址空间布局 2.4 线程终止 2.5 线程等待 2.6 线程分离 一、进程与线程 在…

数模.传染病模型plus

一、SIS模型 二、SIR模型 三、SIRS模型 四、SEIR模型

机器学习---规则学习(序贯覆盖、单条规则学习、剪枝优化)

1. 序贯覆盖 回归&#xff1a; 分类&#xff1a; 聚类&#xff1a; 逻辑规则&#xff1a; 读作&#xff1a;若&#xff08;文字1且文字2且...&#xff09;&#xff0c;则目标概念成立 规则集&#xff1a;充分性与必要性&#xff1b;冲突消解&#xff1a;顺序规则、缺省规则…

嵌入式Linux系统中的设备驱动开发:从设备树到驱动实现

大家好&#xff0c;今天给大家介绍嵌入式Linux系统中的设备驱动开发&#xff1a;从设备树到驱动实现&#xff0c;文章末尾附有分享大家一个资料包&#xff0c;差不多150多G。里面学习内容、面经、项目都比较新也比较全&#xff01;可进群免费领取。 在嵌入式Linux系统中&#x…

Java网络编程 双向通信

目录 网络编程实例创建客户端创建服务端测试 网络编程 Java的网络编程是Java编程语言中用于实现网络通信的一组API和工具。通过Java的网络编程&#xff0c;开发人员可以在Java应用程序中实现客户端和服务器之间的通信&#xff0c;从而构建各种网络应用。 以下是Java网络编程的…

react【六】 React-Router 路由

文章目录 1、Router1.1 路由1.2 认识React-Router1.3 Link和NavLink1.4 Navigate1.5 Not Found页面配置1.6 路由的嵌套1.7 手动路由的跳转1.7.1 在函数式组件中使用hook1.7.2 在类组件中封装高阶组件 1.8 动态路由传递参数1.9 路由的配置文件以及懒加载 1、Router 1.1 路由 1.…

Uipath 调用Python 脚本程序详解

Python 活动概述 UiPath.Python.Activities 是一个新的活动包&#xff0c;创建它是为了支持直接从工作流运行 Python 脚本和方法。 其包含以下活动&#xff1a; Python 作用域(Python Scope) - 为 Python 活动提供作用域的容器。 加载 Python 脚本(Load Python Script) - 将 P…