# 一.小工具
# 1.打开单个网页
import webbrowser
url = "https://ask.csdn.net/questions/8078459" # 将链接替换为你想要打开的实际链接
# 在默认浏览器中打开链接
webbrowser.open(url)
2
3
4
5
6
# 2.打开多个网页
import webbrowser
urls = [
"https://www.example1.com",
"https://www.example2.com",
"https://www.example3.com"
] # 用你想要打开的链接替换这些示例链接
# 循环打开每个链接
for url in urls:
webbrowser.open(url)
2
3
4
5
6
7
8
9
10
11
# 3.Notebook 中执行 Shell
# 方式一
!sh script.sh
# 方式二
import subprocess
subprocess.run(['sh', 'script.sh'])
# 方式三
%%bash
sh script.sh
2
3
4
5
6
7
8
9
10
# 4.日志详细打印
logger.error(f"QA error, question: {q}, error: {e}", exc_info=True)
# 5.全局日志
import logging
def set_debug_mode():
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
console_handler = logging.StreamHandler()
logger.addHandler(console_handler)
logger.setLevel(logging.DEBUG)
set_debug_mode()
2
3
4
5
6
7
8
9
10
11
12
# 6.执行 cmd 命令
if os.name == "nt":
logger.info("当前系统是 Windows")
# 执行 cmd 命令并获取输出
result = subprocess.run(['cmd', '/c', 'title 新空间代码软件开发工作室 Vx:15045666310'], capture_output=True,
text=True)
# 输出命令的执行结果
logger.info(result.stdout)
2
3
4
5
6
7
# 7.obsutil 安装
vim ~/.bash_profile
source ~/.bash_profile
vim ~/.zshrc
source ~/.zshrc
2
3
4
5
根据提供的错误信息,看起来是在执行脚本build_push.sh的过程中,出现了obsutil命令未找到的错误。您还提供了obsutil实际所在的目录为/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12/obsutil。
要解决这个问题,您可以采取以下几个步骤:
确认路径:首先,请确认
obsutil工具的实际路径是否正确。您提供的路径为/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12/obsutil,请确保这是obsutil工具的可执行文件所在的路径。添加路径到环境变量:将
obsutil所在的目录添加到系统的环境变量PATH中,这样系统就能够在任何位置找到obsutil命令。您可以通过编辑相应的配置文件来实现这一点,例如.bashrc、.bash_profile或.zshrc(具体文件名取决于您使用的 Shell)。在文件中添加以下行:export PATH="/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12:$PATH"1然后保存文件并重新启动终端或使用
source命令使更改生效。测试命令:重新运行
build_push.sh脚本,看看是否仍然报告obsutil命令未找到的错误。如果一切正常,脚本应该能够找到并执行obsutil命令。
请注意,以上步骤假设您提供的路径确实包含了可执行的obsutil命令文件。如果路径不正确或文件不可执行,您可能需要重新下载或安装obsutil工具,并确保将其正确配置到系统中。
如果问题仍然存在,建议查看脚本文件build_push.sh的内容,确保在调用obsutil命令之前没有其他错误或问题。
# 8.非异步调用异步方法
import asyncio
# 异步方法
async def async_function():
# 执行异步操作
await asyncio.sleep(1)
return "异步方法执行完成"
# 非异步方法
def sync_function():
# 调用异步方法并获取结果
result = asyncio.run(async_function())
# 打印结果
print(result)
# 调用非异步方法
sync_function()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 9.Dockerfile 项目(python)
# 使用官方的 Python 3.11.5 镜像作为基础镜像,并选择较小的 slim 版本
FROM python:3.11.5-slim
# 设置工作目录
WORKDIR /base-agent/
# 将 requirements.txt 复制到工作目录,并使用官方的清华镜像源安装依赖
COPY requirements.txt /base-agent/
RUN pip install --upgrade pip && \
pip install -r /base-agent/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
# 将应用代码复制到工作目录
COPY . /base-agent/
# 清理不需要的文件,减小镜像大小
RUN rm -rf /var/lib/apt/lists/* \
&& rm -rf /base-agent/__pycache__ \
&& rm -rf /base-agent/*.pyc \
&& rm -rf /base-agent/.cache \
&& rm -rf /base-agent/*.log
# 设置容器启动时执行的命令
ENTRYPOINT ["sh", "./run.sh"]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 二.集合操作
# 1.字典合并
有一个字典 base_config 和一个 yaml 文件,需要将 yaml 文件中存在但是 base_config 中不存在的字段添加到 base_config 中
extra_config:
- id: '1'
name: John
age: 25
- id: '2'
name: Jane
age: 30
- id: '3'
name: Alex
age: 28
2
3
4
5
6
7
8
9
10
import yaml
import json
filename = '0326/config.yaml'
with open(filename, "r", encoding="utf-8") as yaml_file:
config = yaml.safe_load(yaml_file)
# json_str = json.dumps(config)
base_config = {
'id': '1',
'name': 'test1220',
}
# 合并agent的配置,如果
if base_config is not None:
# 读取yaml的数据
extra_configs = config['extra_config']
if extra_configs is not None:
# 这里是字典
filtered_config = next(
(item_config for item_config in extra_configs if item_config['id'] == base_config['id']), None)
if filtered_config is not None:
for key, value in filtered_config.items():
if key not in base_config:
base_config[key] = value
json_str = json.dumps(base_config)
print(json_str)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 2.字符串长度
string = "方法进行确定。"
length = len(string)
print("字符串长度:", length)
2
3
4
urls = [
'11111111111'
,
'22222'
,
'33333'
]
# 循环打开每个链接
for url in urls:
print(len(url))
2
3
4
5
6
7
8
9
10
11
12
# 3.指定位置遍历
arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
start_index = -4
for element in arr[start_index:]:
# 执行遍历操作,对元素进行处理
print(element)
2
3
4
5
6
# 4.删除列表元素
# 使用pop删除
my_list = [1, 2, 3, 4, 5]
removed_element = my_list.pop(2)
print("被移除的元素:", removed_element)
print("列表剩余元素:", my_list)
# 使用切片删除
my_list = [1, 2, 3, 4, 5]
del my_list[:2]
print("删除后的列表:", my_list)
2
3
4
5
6
7
8
9
10
# 5.列表推导式
ids = [user_info.id for user_info in users] if users else []
# 6.字符串 '[22]' 转换为列表
要将字符串 '[22]' 转换为列表,您可以使用 eval() 函数或者使用 Python 内置的 JSON 模块进行转换。以下是两种方法的示例:
方法 1: 使用 eval() 函数
string = '[22]'
my_list = eval(string)
print(my_list)
2
3
方法 2: 使用 JSON 模块
import json
string = '[22]'
my_list = json.loads(string)
print(my_list)
2
3
4
5
# 7.字典字节串
# 创建字节串
byte_string = b'hello'
# 访问字节串中的字节
print(byte_string[0]) # 输出:104
# 将字节串解码为普通字符串
decoded_string = byte_string.decode('utf-8')
print(decoded_string) # 输出:hello
# 将普通字符串编码为字节串
encoded_string = 'world'.encode('utf-8')
print(encoded_string) # 输出:b'world'
# 字节串之间的拼接
concatenated_bytes = byte_string + encoded_string
print(concatenated_bytes) # 输出:b'helloworld'
# 字典字节字符串
dictionary = {b'relationship': Value(sVal=b'\xe5\x8c\x85\xe6\x8b\xac')}
sVal_value = dictionary[b'relationship'].sVal.decode('utf-8')
print(sVal_value)
# 字典字节字符串
dictionary = {b'relationship': Value(sVal=b'\xe5\x8c\x85\xe6\x8b\xac')}
sVal_value = dictionary[b'relationship'].sVal
print(sVal_value)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 8.列表推导式
首先,我们需要一个函数来获取配置文件中的特定键值:
def get_config_key(key):
# 假设config_data是从配置文件中加载的数据
config_data = {
"name": [
{"type": "web", "name": "Server1"},
{"type": "db", "name": "Database1"}
]
}
return config_data.get(key, [])
2
3
4
5
6
7
8
9
这个函数get_config_key接收一个键名作为参数,并返回与该键名相关联的值。如果键不存在,它将返回一个空列表。
接下来,我们使用列表推导式来构建一个新的数据结构:
names = get_config_key("name")
config['name_server'] = [{item['type']: item['name']} for item in names]
2
在这段代码中,我们首先调用get_config_key函数来获取所有名称条目。然后,我们使用列表推导式来创建一个新的列表,其中的每个元素都是一个字典,这个字典包含服务器的类型作为键,服务器的名称作为值。
# 三.文件操作
# 1.yaml 读取
在 Python 中,我们可以使用 PyYAML 库来读取和解析 YAML 数据。首先,我们需要安装 PyYAML 库,可以使用 pip 命令进行安装:
pip install pyyaml
安装完成后,我们可以使用以下代码读取 YAML 文件并解析其中的数据:
import yaml
filename = 'data.yaml'
try:
with open(filename, 'r', encoding="utf-8") as file:
yaml_data = yaml.safe_load(file)
print(yaml_data)
except FileNotFoundError:
print(f"File '{filename}' not found.")
except yaml.YAMLError as e:
print(f"Error while loading YAML: {e}")
except Exception as e:
print(f"An error occurred: {e}")
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml数据
person:
name: John Smith
age: 30
hobbies:
- reading
- hiking
2
3
4
5
6
访问方式:
print(yaml_data['person']['name']) # 输出:John Smith
print(yaml_data['person']['age']) # 输出:30
print(yaml_data['person']['hobbies']) # 输出:['reading', 'hiking']
2
3
# 2.使用方式
要在 YAML 文件中添加一个名为agent_config的数组,每个数组元素都是一个对象,对象包含id、name和age三个字段,可以使用以下示例:
agent_config:
- id: 1
name: John
age: 25
- id: 2
name: Jane
age: 30
- id: 3
name: Alex
age: 28
2
3
4
5
6
7
8
9
10
在这个示例中,agent_config是数组的键名,后面的每个缩进的部分表示数组的元素。每个元素都是一个对象,使用键值对的形式表示,其中id、name和age是字段名,后面是对应的值。
如果你想在 Python 代码中动态添加这样的数组数据并将其写入 YAML 文件,可以使用yaml库提供的方法。以下是一个示例:
import yaml
data = {
'agent_config': [
{'id': 1, 'name': 'John', 'age': 25},
{'id': 2, 'name': 'Jane', 'age': 30},
{'id': 3, 'name': 'Alex', 'age': 28}
]
}
filename = 'config.yaml'
with open(filename, 'w', encoding='utf-8') as yaml_file:
yaml.dump(data, yaml_file)
2
3
4
5
6
7
8
9
10
11
12
13
在这个示例中,我们创建了一个字典data,其中包含一个键为agent_config的数组,数组的元素是包含id、name和age字段的字典对象。然后,我们将data字典写入到名为config.yaml的 YAML 文件中。
# 3.json 数据写入 json 文件
import json
# 生成要写入的JSON数据
data = {
"name": "John",
"age": 30,
"city": "New York"
}
2
3
4
5
6
7
8
# 指定要写入的JSON文件路径
file_path = "path/to/your/file.json"
# 将JSON数据写入文件
with open(file_path, "w") as json_file:
json.dump(data, json_file)
2
3
4
5
6
# 4.读取目录下的文件
import os
def read_parquet_file(directory):
"""
读取指定目录下的所有.parquet文件
:param directory: 要搜索的目录路径
:return: 包含所有Parquet文件名的列表
"""
data = []
for root, dirs, files in os.walk(directory):
for file in files:
# 检查文件扩展名是否为.parquet
if file.endswith('.parquet'):
# 构建文件的完整路径
full_path = os.path.join(root, file)
# 读取Parquet文件,这里假设使用pandas
df = pd.read_parquet(full_path)
# 将DataFrame添加到数据列表中
data.append(df)
return data
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 5.Parquet 转 Excel
import pyarrow.parquet as pq
import pandas as pd
import os
def parquet2excel(file_name):
"""
将.parquet文件转换为Excel文件
:param file_name: Parquet文件的路径
:return: 无返回值,但会打印保存路径
"""
# 读取Parquet文件
parquet_file = pq.ParquetFile(file_name)
data = parquet_file.read().to_pandas()
# 将读取的数据转换为pandas DataFrame
df = pd.DataFrame(data)
# 构建Excel文件的保存路径
excel_path = file_name.replace('.parquet', '.xlsx')
# 确保保存路径的目录存在
os.makedirs(os.path.dirname(f'excel/{excel_path}'), exist_ok=True)
# 将DataFrame写入Excel文件
df.to_excel(f'excel/{excel_path}', index=False)
# 打印保存路径
print(f'数据已保存到 {excel_path}')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 6.excel 转 text 文件
import pandas as pd
def excel_insert_sql(file_path: str):
"""
根据excel内容生成insert语句
:param file_path: Excel文件的路径
:return: SQL插入语句字符串
"""
res = ''
# 如果Excel文件有多个sheet,可以通过sheet_name参数指定要读取的sheet
data = pd.read_excel(file_path, sheet_name='Sheet1')
for index, row in data.iterrows():
name = row['name']
name = name.replace('"', '')
description = row['description']
description = str(description).replace('"', '')
print(f'insert vertex entity values "{name}":("{description}");\n')
res += f'insert vertex entity values "{name}":("{description}");\n'
# 根据需要进行数据处理
# 例如,获取某一列的数据
# column_data = data['column_name']
return res
def save_to_text(insert_sql: str, file_path: str):
"""
保存SQL插入语句到文本文件中。
:param insert_sql: SQL插入语句字符串。
:param file_path: 要保存的文件路径,默认为当前目录下的'output.txt'。
:return: None
"""
with open(file_path, 'w', encoding='utf-8') as file:
file.write(insert_sql)
if __name__ == '__main__':
entities = 'create_final_entities'
file_path = f'excel/{entities}.xlsx'
res = excel_insert_sql(file_path=file_path)
save_to_text(insert_sql=res, file_path=f'txt/{entities}.txt')
print(f'数据处理完成')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 四.开发常用
# 1.全局参数
import json
# 读取配置文件
with open('config.json') as f:
config = json.load(f)
# 在全局字典中存储配置参数
global_params = config
def my_function():
# 在函数中使用全局配置参数
print(global_params['param1'])
my_function() # 输出配置文件中的参数值
2
3
4
5
6
7
8
9
10
11
12
13
14
# 2.变量+占位
def template_history(query: str):
content = ("历史消息:{history}\n"
f"{query}")
return content
2
3
4
# 3.受保护属性
在 Python 中,有两种约定用于指示某个属性或方法是受保护的,即不应该被外部直接访问或使用。这些约定是:
单下划线前缀(_): 在属性或方法的名称前加上单个下划线,例如
_protected_variable或_protected_method()。这个约定表示该属性或方法是受保护的,建议外部代码不要直接访问或使用它。虽然这只是一个约定,并没有真正限制外部代码的访问,但它向其他开发人员传达了一个警示,表明这是一个内部实现细节,可能会有变化。双下划线前缀(__): 在属性或方法的名称前加上双下划线,例如
__private_variable或__private_method()。这个约定表示该属性或方法是私有的,应该在类的内部使用,不应该被子类或外部代码直接访问。Python 使用名称修饰(name mangling)来对双下划线前缀进行变换,以避免与子类中相同名称的属性或方法冲突。例如,__private_variable在类内部被转换为_ClassName__private_variable。
在 Python 中,__dict__ 是一个特殊属性,它是一个字典(dictionary),用于存储对象的属性和方法。该字典将对象的属性名作为键,对应的属性值作为值。
通过访问 __dict__ 属性,您可以获取对象的属性和方法字典,从而查看、修改或删除对象的属性和方法。这使得您可以动态地操作对象的属性,而不需要事先知道属性的名称。
# 4.联合类型
使用|进行类型提示(Python 3.10+)
def process_value(value: int | str) -> None:
if isinstance(value, int):
print(f"Integer: {value}")
elif isinstance(value, str):
print(f"String: {value}")
# 使用
process_value(10) # 输出: Integer: 10
process_value("Hello") # 输出: String: Hello
2
3
4
5
6
7
8
9
使用typing.Union进行类型提示(Python 3.5 - 3.9)
from typing import Union
def process_value(value: Union[int, str]) -> None:
if isinstance(value, int):
print(f"Integer: {value}")
elif isinstance(value, str):
print(f"String: {value}")
2
3
4
5
6
7
# 5.多返回值
import numpy as np
def process_array(arr):
min_val = np.min(arr)
max_val = np.max(arr)
return min_val, max_val
# 创建一个二维数组
array = np.array([[1, 2, 3], [4, 5, 6]])
# 使用函数
min_val, max_val = process_array(array)
print(f"Min: {min_val}, Max: {max_val}")
2
3
4
5
6
7
8
9
10
11
12
13
# 6.Optional 使用
在 Python 中,Optional[str] 是一种类型提示,用于表示一个变量可以是 str 类型或 None 类型。这种类型提示通常用于函数参数或返回值,表示该参数或返回值可以为空。
下面是一个使用 Optional[str] 的示例:
def greet(name: Optional[str] = None) -> str:
if name:
return f"Hello, {name}!"
else:
return "Hello, stranger!"
# 调用函数
print(greet("Alice")) # 输出: "Hello, Alice!"
print(greet()) # 输出: "Hello, stranger!"
2
3
4
5
6
7
8
9
在这个例子中:
- 函数
greet的参数name被声明为Optional[str]类型。这意味着它可以接受一个字符串类型的值,也可以接受None值。 - 如果
name参数被传入,函数将返回"Hello, {name}!"字符串。 - 如果
name参数为None(即未传入值),函数将返回"Hello, stranger!"字符串。
使用 Optional[str] 类型提示可以更好地理解代码的预期行为,并在编译时捕获一些潜在的错误。
# 7.提取 json 字符串
为了匹配第一个 { 和最后一个 } 之间的内容,包括 {} 本身,我们可以使用以下正则表达式:
import re
answer_ = """-----key:answer, value:```json
{
"question1": "鲁迅是什么朝代的?",
"question2": "鲁迅是谁?",
"question3": "鲁迅原名是?"
}
```"""
# 正则表达式匹配第一个{和最后一个}之间的内容
pattern = r'\{.*?\}'
match = re.search(pattern, answer_, re.DOTALL)
if match:
result = match.group(0).strip() # 使用strip()去除可能的前后空白字符
else:
result = "没有找到匹配的内容"
print(result)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这个正则表达式 \{.*?\} 会匹配第一个 { 到最后一个 } 之间的内容。re.DOTALL 标志允许 . 匹配包括换行符在内的任何字符。
# 8.装饰器
下面是一个简单的日志记录装饰器示例:
import functools
import time
def log_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(f"Calling {func.__name__}")
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper
@log_decorator
def add(a, b):
time.sleep(1) # 模拟耗时操作
return a + b
result = add(5, 3)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 9.关键字
1. Python 关键字简介
Python 关键字是一些保留的词汇,它们在 Python 中有特殊的含义,用于定义程序的结构和流程。根据 Python 官方文档,截至 Python 3.10,共有 35 个关键字:
False,None,True,and,as,assert,async,await,break,class,continue,def,del,elif,else,except,finally,for,from,global,if,import,in,is,lambda,nonlocal,not,or,pass,raise,return,try,while,with,yield
这些关键字覆盖了从条件判断到循环控制,从异常处理到类和函数定义等多个方面。
2. 关键字的分类
Python 关键字可以根据它们的用途进行分类:
- 逻辑控制关键字:
and,not,or,if,elif,else,for,while - 定义和声明关键字:
class,def,global,nonlocal,lambda - 异常处理关键字:
try,except,finally,raise - 模块和包处理关键字:
import,from,as - 赋值和迭代关键字:
in,is,return,yield - 其他关键字:
False,None,True,assert,async,await,break,continue,del,pass,with
3. 关键字的用法
每个关键字都有其特定的用法和含义:
if,elif,else:用于条件判断。if x > 0: print("Positive") elif x == 0: print("Zero") else: print("Negative")1
2
3
4
5
6for,while:用于循环控制。for i in range(5): print(i) while x < 10: x += 11
2
3
4
5class,def:用于定义类和函数。class MyClass: pass def my_function(): pass1
2
3
4
5try,except,finally:用于异常处理。try: # 尝试执行的代码 except Exception as e: # 处理异常 finally: # 无论是否发生异常都会执行的代码1
2
3
4
5
6import,from,as:用于导入模块和包。import os from sys import argv import numpy as np1
2
3return,yield:用于从函数返回值或生成器产生值。def my_function(): return "Hello, World!" def my_generator(): yield 1 yield 21
2
3
4
5
6
# 10.时间戳比较
在 Python 中,可以使用datetime.fromtimestamp()方法将时间戳转换为datetime对象。如果时间戳是毫秒级,需要将其除以 1000 转换为秒。以下是转换的示例代码:
from datetime import datetime
# 假设的时间戳(毫秒级)
create_time_timestamp = 1716548680000
# 将毫秒级时间戳转换为秒
create_time_seconds = create_time_timestamp / 1000
# 转换为datetime对象
create_time = datetime.fromtimestamp(create_time_seconds)
# 指定的日期
compare_date = datetime(2024, 1, 1)
# 比较时间
if create_time > compare_date:
print("createTime is greater than 2024-01-01")
else:
print("createTime is not greater than 2024-01-01")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 11.获取周一
要编写一个 Python 程序,生成从 2024 年 1 月 1 日到今天(2024 年 6 月 1 日)的所有周一的日期,并打开每个日期对应的网页链接,你可以使用以下代码:
import webbrowser
from datetime import datetime, timedelta
def generate_mondays(start_date, end_date):
# 初始化一个空列表来存储周一的日期
mondays = []
# 从开始日期开始循环,直到结束日期
current_date = start_date
while current_date <= end_date:
if current_date.weekday() == 0: # weekday() 返回0代表周一
mondays.append(current_date)
current_date += timedelta(days=1)
return mondays
def open_webpages(mondays):
# 为每个周一生成URL并打开网页
base_url = "https://mp.csdn.net/poster/"
for monday in mondays:
url = base_url + monday.strftime("%Y%m%d")
webbrowser.open_new(url) # 使用 open_new 来在新标签页中打开网页
# 设置起始日期为2024年1月1日
start_date = datetime(2024, 1, 1)
# 设置结束日期为今天
end_date = datetime(2024, 6, 1)
# 生成所有周一的日期列表
mondays = generate_mondays(start_date, end_date)
# 打开每个周一的网页
open_webpages(mondays)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
这段代码首先定义了一个函数generate_mondays来计算指定范围内的所有周一的日期,然后定义了open_webpages函数来为每个周一生成 URL 并打开网页。最后,它设置了起始日期和结束日期,并调用这些函数。
请注意,这段代码会依次打开从 2024 年 1 月 1 日到 2024 年 6 月 1 日之间所有周一的网页。如果你希望同时打开所有网页,可以修改webbrowser.open_new(url)为webbrowser.open(url),但这样可能会导致浏览器标签页过多,使用户难以管理。使用open_new可以让用户在浏览器中逐个查看每个标签页。
# 12.列表推导式
基本语法:
列表推导式的基本语法结构如下:
[expression for item in iterable if condition]
expression:新列表中的元素表达式。item:从可迭代对象iterable中迭代得到的元素。condition:一个可选的条件表达式,用于过滤满足条件的元素。
简单示例:
让我们从一个简单的例子开始,假设我们有一个数字列表,我们想要创建一个新的列表,其中只包含原列表中的偶数:
original_list = [1, 2, 3, 4, 5, 6]
even_numbers = [num for num in original_list if num % 2 == 0]
print(even_numbers) # 输出: [2, 4, 6]
循环比较:
2
3
4
列表推导式与循环的比较
使用列表推导式与使用传统的 for 循环相比,代码更加简洁,可读性更强。以下是使用 for 循环实现相同功能的代码:
original_list = [1, 2, 3, 4, 5, 6]
even_numbers = []
for num in original_list:
if num % 2 == 0:
even_numbers.append(num)
print(even_numbers) # 输出: [2, 4, 6]
嵌套列表推导式:
2
3
4
5
6
7
列表推导式可以嵌套使用,用于处理更复杂的数据结构,如二维数组的转置:
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
transposed = [list(row) for row in zip(*matrix)]
print(transposed) # 输出: [[1, 4, 7], [2, 5, 8], [3, 6, 9]]
包含多个 for 子句的列表推导式:
2
3
4
列表推导式可以包含多个 for 子句,用于笛卡尔积等操作:
from itertools import product
x = [1, 2, 3]
y = ['a', 'b']
pairs = [(x, y) for x in x for y in y]
print(pairs) # 输出: [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b'), (3, 'a'), (3, 'b')]
使用字典推导式:
2
3
4
5
6
列表推导式不仅可以用于列表,还可以用于创建字典:
keys = ['a', 'b', 'c']
values = [1, 2, 3]
my_dict = {k: v for k, v in zip(keys, values)}
print(my_dict) # 输出: {'a': 1, 'b': 2, 'c': 3}
2
3
4
# 13.字典遍历
在 Python 中,遍历字典({})有几种不同的方法,主要取决于你想要遍历字典的键(keys)、值(values)还是键值对(items)。
# 1.遍历所有键
使用.keys()方法可以获取字典中所有的键,然后通过循环遍历这些键。
my_dict = {'a': 1, 'b': 2, 'c': 3}
for key in my_dict.keys():
print(key)
2
3
或者更简洁的方式是直接在字典对象上使用for循环:
for key in my_dict:
print(key)
2
# 2.遍历所有值
使用.values()方法可以获取字典中所有的值,然后通过循环遍历这些值。
for value in my_dict.values():
print(value)
2
# 3.遍历键值对
使用.items()方法可以获取字典中的所有键值对,返回一个包含(key, value)元组的视图对象。
for key, value in my_dict.items():
print(key, value)
2
# 4.使用列表推导式
如果你需要从字典中提取信息并创建列表,可以使用列表推导式。
遍历键:
keys_list = [key for key in my_dict]1遍历值:
values_list = [value for value in my_dict.values()]1遍历键值对:
items_list = [(key, value) for key, value in my_dict.items()]1
# 5.使用字典推导式
如果你需要基于现有字典创建一个新的字典,可以使用字典推导式。
new_dict = {key: value * 2 for key, value in my_dict.items()}
# 6.使用 next()函数
如果你需要访问字典的第一个键值对,可以使用next()函数。
first_item = next(iter(my_dict.items()), None)
# 7.使用 get()方法
如果你需要安全地访问字典中的值,可以使用get()方法,它允许你指定默认值。
value = my_dict.get('key', 'default_value')
# 14.数字的布尔值
0 的 bool 值:
if 0:
print(True)
else:
print(False)
2
3
4
5
返回 False
-1 的 bool 值:
if -1:
print(True)
else:
print(False)
2
3
4
5
返回 True
# 15. 第三方库 simplejson
simplejson是一个第三方库,它是 Python 内置json模块的一个分支,提供了一些额外的功能和性能优化。以下是simplejson的一些特点:
- 性能:在某些场景下,
simplejson可能比内置的json模块有更好的性能表现,尤其是在处理大量数据时。 - 额外的序列化选项:
simplejson提供了一些额外的序列化选项,如use_decimal,允许使用Decimal类型而不是默认的float。 - 更好的错误信息:在发生错误时,
simplejson可能会提供更详细的错误信息,有助于调试。 - 兼容性:
simplejson在某些情况下提供了更好的兼容性,尤其是在处理一些边缘 JSON 格式时。
# 五.缓存设计
# 1. 环境准备
在 Python 中实现缓存,我们可能会用到标准库中的functools.lru_cache装饰器,或者使用第三方库如cachetools。以下是安装cachetools的方法:
pip install cachetools
# 2. 使用functools.lru_cache
Python 标准库中的functools模块提供了一个非常有用的装饰器lru_cache,它可以实现最近最少使用(Least Recently Used,LRU)缓存。这意味着它会缓存最近调用的函数的结果,当缓存满了之后,会淘汰掉最久未被使用的缓存项。
以下是一个使用lru_cache的示例:
import functools
@functools.lru_cache(maxsize=128)
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 计算斐波那契数列的第10项
print(fibonacci(10))
2
3
4
5
6
7
8
9
10
在这个示例中,fibonacci函数的结果会被缓存,当再次请求相同的参数时,会直接从缓存中获取结果,而不是重新计算。
# 3. 使用cachetools
cachetools是一个提供多种缓存策略的第三方库。它支持 LRU、LFU(Least Frequently Used,最少使用频率)和 RR(Random Replacement,随机替换)等策略。
以下是一个使用cachetools实现 LRU 缓存的示例:
from cachetools import LRUCache
def expensive_function(x):
# 模拟一个耗时的操作
return x * x
cache = LRUCache(maxsize=100)
def cached_expensive_function(x):
if x not in cache:
cache[x] = expensive_function(x)
return cache[x]
# 使用缓存的函数
print(cached_expensive_function(4))
print(cached_expensive_function(4)) # 第二次调用将使用缓存的结果
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在这个示例中,我们创建了一个LRUCache对象,并使用它来缓存expensive_function函数的结果。
# 4. 文件系统缓存
在某些情况下,我们可能需要将缓存数据持久化到文件系统中。这可以通过将数据序列化并存储到文件中来实现。以下是一个简单的文件系统缓存示例:
import json
import os
CACHE_FILE = 'cache.json'
def load_cache():
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, 'r') as f:
return json.load(f)
return {}
def save_cache(cache):
with open(CACHE_FILE, 'w') as f:
json.dump(cache, f)
cache = load_cache()
def get_data(key):
if key in cache:
return cache[key]
else:
# 模拟获取数据的过程
data = f"data for {key}"
cache[key] = data
save_cache(cache)
return data
# 使用文件系统缓存
print(get_data("key1"))
print(get_data("key1")) # 第二次调用将使用缓存的数据
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
在这个示例中,我们使用 JSON 文件作为缓存存储,并在需要时加载和保存缓存数据。
# 5.excel 行数
from openpyxl import load_workbook
# 指定Excel文件路径
excel_path = 'test_01.xlsx'
# 加载工作簿
workbook = load_workbook(filename=excel_path)
# 选择活动工作表,默认是第一个工作表
sheet = workbook.active
# 获取工作表的行数
row_count = sheet.max_row
print(f'Excel file has {row_count} rows.')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 6.判断类型
if vocab_table_ids is not None and isinstance(vocab_table_ids, str):
# 这里是当 vocab_table_ids 是字符串时执行的代码
2
# 7.pathlib 使用
现在,让我们回到最初的问题,深入分析以下代码片段:
search_path = Path(self._root_dir) / (base_dir or "")
这行代码是pathlib模块的一个典型用法,它展示了如何使用Path对象和条件表达式来构建一个搜索路径。下面是对这行代码的逐部分解释:
Path(self._root_dir):这里创建了一个Path对象,其路径由实例变量self._root_dir指定。这个变量代表了类的实例所关联的根目录路径。/:这是Path对象的路径连接运算符,用于将self._root_dir与base_dir连接起来。(base_dir or ""):这是一个 Python 的条件表达式,用于确定要连接的第二个路径部分。如果base_dir是None或者任何被认为是False的值(如空字符串、0 等),则表达式的结果将是一个空字符串。这样,即使base_dir未定义或为None,代码也不会抛出错误。
通过这种方式,search_path变量将存储一个由self._root_dir和base_dir(如果提供了的话)组合而成的新路径。如果base_dir是None或未提供,search_path将只包含self._root_dir的值。
# 8.异步请求
在 Python 里如果用异步函数,就里里外外都需要使用异步函数。否则在异步函数里面使用 time.sleep 这种会阻塞整个事件循环。
模拟网络请求的关系:
time.sleep <===> requests
async.sleep <===> aiohttp
我新增的几个投放、更新卡片的接口是用 aiohttp 支持的
# 异步sleep
await asyncio.sleep(random())
# 同步sleep
time.sleep(random())
2
3
4
5
下面是一个使用aiohttp进行异步 HTTP 请求的例子:
import aiohttp
import asyncio
async def fetch(url, session):
async with session.get(url) as response:
return await response.text()
async def main():
url = 'http://example.com'
async with aiohttp.ClientSession() as session:
html = await fetch(url, session)
print(html)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 9.查看包的大小
du -mh /Users/qinyingjie/miniconda3/envs/llm-graph-builder/lib/python3.11/site-packages/transformers
# 10.并发请求
# 并发请求
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
futures = []
with ThreadPoolExecutor(max_workers=10) as executor:
for chunk in combined_chunk_document_list:
chunk_doc = Document(
page_content=chunk.page_content.encode("utf-8"), metadata=chunk.metadata
)
futures.append(
executor.submit(llm_transformer.convert_to_graph_documents, [chunk_doc])
)
for i, future in enumerate(concurrent.futures.as_completed(futures)):
graph_document = future.result()
graph_document_list.append(graph_document[0])
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 11.判断代码
enable_user_agent = os.environ.get("ENABLE_USER_AGENT", "False").lower() in ("true", "1", "yes")
# 12.执行 shell 命令
在 Python 中,你可以使用几种方法来执行 shell 命令。以下是一些常用的方法:
os.system(): 这是最简单的方法,它执行一个指定的命令字符串。返回值是命令的退出状态码。import os result = os.system('ls -l') print("Exit status:", result)1
2
3subprocess.run()(推荐方式):subprocess模块提供了更多的灵活性和强大的功能来执行和管理子进程。import subprocess result = subprocess.run(['ls', '-l'], capture_output=True, text=True) print("Output:", result.stdout)1
2
3subprocess.Popen(): 如果你需要更细粒度的控制,可以使用Popen。这个方法允许你执行一个命令并与之交互。import subprocess process = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = process.communicate() print("Output:", stdout) if process.returncode != 0: print("Error:", stderr)1
2
3
4
5
6os.popen(): 这个方法可以用来执行命令并读取输出。with os.popen('ls -l') as stream: print(stream.read())1
2subprocess.check_output(): 这个方法用于执行命令并获取输出,如果命令返回非零退出状态码,会抛出异常。from subprocess import check_output output = check_output(['ls', '-l'], text=True) print("Output:", output)1
2
3subprocess.call(): 这个方法执行一个命令并等待其完成,返回命令的退出状态码。import subprocess status = subprocess.call(['ls', '-l']) print("Exit status:", status)1
2
3
使用这些方法时,记得处理异常和错误,特别是当命令失败时。subprocess模块是执行 shell 命令的首选方式,因为它提供了更多的控制和灵活性。
# 13.中位数的计算方法
import numpy as np
# 假设我们有一组数据
data = np.array([1, 3, 5, 7, 9])
# 使用 np.median 计算中位数
median_value = np.median(data)
print("中位数是:", median_value)
2
3
4
5
6
7
8
9
# 14.返回 2 个值
class MyClass:
def my_method(self, x: int, y: int) -> tuple[int, int]:
# 这里进行一些操作
result1 = x + y
result2 = x * y
# 返回一个整数元组
return result1, result2
# 创建 MyClass 的实例
my_instance = MyClass()
# 使用实例调用 my_method 方法
result1, result2 = my_instance.my_method(3, 4)
# 打印结果
print(result1) # 输出: 7
print(result2) # 输出: 12
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 15.获取耗时
from datetime import datetime
import time
# 获取当前时间戳并转换为datetime对象
start_time = datetime.fromtimestamp(time.time())
# 程序暂停5秒
time.sleep(5)
# 再次获取当前时间戳并转换为datetime对象
end_time = datetime.fromtimestamp(time.time())
# 计算开始和结束时间的时间差
cost_time = (end_time - start_time)
# 将时间差转换为时分秒格式
hours, remainder = divmod(cost_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
# 格式化输出时分秒
formatted_time = f"{hours:02}:{minutes:02}:{seconds:02}"
print(formatted_time)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
00:00:05
# 16.方法调用
在 Python 中,方法通常是用来对对象进行操作的函数。方法可以带有参数,也可以不带参数。当你调用一个方法时,是否使用括号取决于你想要执行的操作。
不带括号:当你引用一个方法而不使用括号时,你实际上是在引用这个方法本身,而不是调用它。这通常用于获取方法的引用,或者在某些情况下,你想要将方法作为参数传递给其他函数。
带括号:当你调用一个方法时,你应该使用括号,即使方法不接受任何参数。括号表示你想要执行这个方法。如果方法需要参数,你还需要在括号内提供这些参数。
例如,假设有一个类UserManager,它有一个方法get_current_user:
class UserManager:
def get_current_user(self):
# 这里是获取当前用户的逻辑
return "Current User"
2
3
4
不带括号:
user_manager.get_current_user这将返回方法对象本身,而不是方法的返回值。带括号:
user_manager.get_current_user()这将调用get_current_user方法,并返回它返回的值,例如 "Current User"。
如果你尝试调用一个不接受参数的方法而不使用括号,Python 解释器会抛出一个TypeError,因为它期望你在调用方法时使用括号。例如:
user_manager = UserManager()
current_user = user_manager.get_current_user() # 正确
# current_user = user_manager.get_current_user # 错误,会引发TypeError
2
3
总结来说,带括号是调用方法的标准方式,而不带括号通常用于引用方法本身。在 Python 中,即使方法不接受任何参数,调用时也应该使用括号。
# 17.处理 parqut 文件
import pandas as pd
# path = '/Users/qinyingjie/Documents/python-workspace/graphrag-server/ragtest/output/20240923-100846/artifacts/create_final_entities.parquet'
path = '/Users/qinyingjie/Documents/python-workspace/graphrag-server/ragtest/output/20240923-100846/artifacts/create_final_entities.parquet'
df = pd.read_parquet(path)
# 统计行数
num_rows = len(df)
print(f"文件 {path} 包含 {num_rows} 行。")
# 检查是否存在 'weight' 字段
if 'description_embedding' in df.columns:
description_embedding = df['description_embedding'][0]
embedding_dim = len(description_embedding)
print(embedding_dim)
print(f"The embedding dimension is: {embedding_dim}")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 18.获取异常信息
try:
return SearchResult(
response=response,
context_data=context_records,
context_text=context_text,
completion_time=time.time() - start_time,
llm_calls=1,
prompt_tokens=num_tokens(search_prompt, self.token_encoder),
)
except Exception as e:
log.exception("Exception in _map_response_single_batch")
error_message = str(e) # Capture the exception message
return SearchResult(
response=f"Error: {error_message}", # Store the error message in the response
context_data=context_records,
context_text=context_text,
completion_time=time.time() - start_time,
llm_calls=1,
prompt_tokens=num_tokens(search_prompt, self.token_encoder),
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 19.接口下载 json 文件
data_ = result['data']
orig_name = data_['orig_name']
json_data = json.dumps(data_)
safe_filename = urllib.parse.quote(orig_name + ".json")
headers = {
"Content-Disposition": f'attachment; filename="{safe_filename}"; filename*=UTF-8\'{safe_filename}'
}
return http_response(
content=json_data,
media_type="application/json",
headers=headers
)
2
3
4
5
6
7
8
9
10
11
12
# 一.Python 工具包
# 1.打包
sh build_push.sh 0.0.1.202404131548
# 2.预约明天
pip install /Users/qinyingjie/Documents/python-workspace/kwan_tools/dist/kwan_tools-0.0.1.202404131548-py3-none-any.whl
# 3.场地预约
order-start config.yaml
# 1.枚举的使用
from enum import Enum
class TaskStatus(Enum):
# 枚举成员名称通常大写
RUNNING = "运行中"
TERMINATED = "已终止"
SUCCESS = "成功"
FAILURE = "失败"
# 使用枚举
task = TaskStatus.RUNNING
print(task) # 输出: TaskStatus.RUNNING
print(task.name) # 输出: RUNNING
print(task.value) # 输出: 运行中
2
3
4
5
6
7
8
9
10
11
12
13
14
# 2. 异步任务终止
from typing import Any
from manager.graph.manager import GraphManager
from manager.schemas.manager import SchemasManager
from utils import logger
from fastapi import APIRouter
from manager.graph.query_graph import LocalQueryGraph
import time
import asyncio
GraphRouter = APIRouter(prefix="/test", tags=["测试管理"])
global task_map
task_map: dict[Any, Any] = {}
class GraphRouterMap:
# 长时间运行的异步任务
@staticmethod
async def long_running_task():
try:
while True:
# 模拟任务运行
print("长时间运行的任务正在运行.....")
await asyncio.sleep(2)
except asyncio.CancelledError:
print("任务被取消")
@staticmethod
@GraphRouter.get("/start", summary="启动任务", description="启动任务")
async def start_task():
# 启动长时间任务,并返回任务对象
run_id = time.strftime("%Y%m%d-%H%M%S")
task = asyncio.create_task(GraphRouterMap.long_running_task())
task_map[run_id] = task
return run_id
@staticmethod
@GraphRouter.get("/stop", summary="中止任务", description="中止任务")
async def stop_task(run_id: str):
task = task_map.get(run_id)
task.cancel()
return {"message": "任务取消请求已发送"}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 3.详细日志
在 Python 中打印详细的堆栈信息,通常可以通过以下方法实现:
使用traceback模块:这是 Python 标准库中的一个模块,专门用于打印堆栈信息。当你遇到异常时,可以使用traceback.print_exc()来打印异常信息和堆栈跟踪。例如:
import traceback
try:
# 你的代码逻辑
# 可能会引发异常的代码
except Exception as e:
traceback.print_exc()
2
3
4
5
6
7
# 4.byte 转 base64
在 Python 中,可以使用base64模块来将bytes对象转换为 Base64 编码的字符串。以下是一个简单的示例:
import base64
# 假设你有一个bytes对象
bytes_data = b'your_bytes_data_here'
# 使用base64.b64encode()函数将bytes对象编码为base64字符串
base64_bytes = base64.b64encode(bytes_data)
# 如果你需要将base64编码的bytes对象转换为字符串,可以这样做
base64_str = base64_bytes.decode('utf-8')
print(base64_str)
2
3
4
5
6
7
8
9
10
11
12
这段代码首先导入了base64模块,然后创建了一个bytes对象。使用base64.b64encode()函数将bytes对象编码为 Base64 格式的bytes对象。最后,使用decode('utf-8')将 Base64 编码的bytes对象转换为字符串。这样就可以得到 Base64 编码的字符串了。
# 5.not all
not all(
dep_id not in self.rest_node_ids
for dep_id in self.generate_routes.answer_dependencies[answer_node_id]
)
2
3
4
逐步分析:
字典访问:
self.generate_routes.answer_dependencies[answer_node_id]是一个字典,其中answer_node_id是键,对应的值是一个包含依赖 ID(dep_id)的列表。遍历列表:
for dep_id in self.generate_routes.answer_dependencies[answer_node_id]是一个 for 循环,它遍历上述列表中的每一个dep_id。条件检查:
dep_id not in self.rest_node_ids是一个条件表达式,检查当前遍历到的dep_id是否不在self.rest_node_ids这个集合中。逻辑取反:
not all(...)是一个not操作符和all函数的组合。如果all()函数的结果为True(即所有dep_id都不在self.rest_node_ids中),则not all(...)的结果为False;如果all()函数的结果为False(即至少有一个dep_id在self.rest_node_ids中),则not all(...)的结果为True。
# 6.any 有任何一个
any(
dep_id in self.rest_node_ids
for dep_id in self.generate_routes.answer_dependencies[answer_node_id]
)
2
3
4
# 7.NaN 如何判空
在 Python 中,当你使用pandas库处理数据时,NaN(Not a Number)是一个特殊的浮点数值,用来表示缺失或未定义的数据。如果你想要检查一个值是否为NaN,你可以使用pandas提供的isna()或isnull()函数,这两个函数在pandas中是等效的,可以互换使用。
以下是一些判断model_ans是否为空(即是否为NaN)的方法:
- 使用
isna()或isnull()函数:
if pd.isna(model_ans) or model_ans.empty:
continue
2
- 直接比较:
由于NaN不等于任何值,包括它自己,所以你可以直接比较:
if model_ans != model_ans:
continue
2
- 使用
np.isnan()(如果你已经导入了numpy库):
import numpy as np
if np.isnan(model_ans):
continue
2
3
4
- 使用
try-except结构来捕获TypeError,因为任何涉及NaN的操作都可能引发TypeError:
try:
# 尝试执行一些操作,如果model_ans是NaN,这里可能会引发TypeError
pass
except TypeError:
continue
2
3
4
5
# 8.日期时间戳
import datetime
# 获取当前时间
now = datetime.datetime.now()
# 转换为时间戳
timestamp = now.timestamp()
# 只保留10位数字
timestamp_10_digits = int(timestamp)
print(timestamp_10_digits)
2
3
4
5
6
7
8
9
10
11
12
# 9.处理空body
from flask import request, jsonify, reqparse
def publish_workflow(app_id):
content_type = request.headers.get("Content-Type", "")
args = {
'workflow_id': '',
'version_description': ''
}
if "application/json" in content_type and request.get_json():
parser = reqparse.RequestParser()
parser.add_argument("workflow_id", type=str, required=False, default='', location="json")
args = parser.parse_args()
workflow_id = args.get("workflow_id", "")
# 你的业务逻辑代码
return jsonify({"message": "Workflow published successfully", "workflow_id": workflow_id, "version_description": version_description}), 200
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 10.python排序
# 假设workflows是一个包含Workflow对象的列表
# 并且Workflow对象有一个version_number属性
# 使用sorted函数进行排序
sorted_workflows = sorted(
workflows,
key=lambda x: int(x.version_number[1:]), # 去掉'V',将剩余部分转换为整数
reverse=True # 逆序排列
)
# 或者使用列表的sort方法直接在原列表上进行排序
workflows.sort(
key=lambda x: int(x.version_number[1:]), # 去掉'V',将剩余部分转换为整数
reverse=True # 逆序排列
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 11.enumerate 函数的基本概念
enumerate 函数的基本语法如下:
enumerate(iterable, start=0)
iterable:这是一个可迭代对象,可以是列表、元组、字符串等。可迭代对象是指可以被迭代器遍历的对象,即可以使用for循环进行遍历的对象。start:这是一个可选参数,用于指定索引的起始值,默认为 0。通过设置start参数,可以改变索引的起始值,从而满足不同的需求。
在使用 enumerate 函数时,它会返回一个枚举对象,该对象是一个迭代器,每次迭代会返回一个包含索引和值的元组。例如:
contexts = ['apple', 'banana', 'cherry']
for index, value in enumerate(contexts):
print(f"Index: {index}, Value: {value}")
2
3
输出将会是:
Index: 0, Value: apple
Index: 1, Value: banana
Index: 2, Value: cherry
2
3
在这个例子中,enumerate 函数将列表 contexts 中的每个元素与其索引组合成一个元组,并在每次迭代中返回一个元组。
# 12.统计耗时
end_time = time.perf_counter() - start_at
print(f"Time taken: {end_time:.20f} seconds")
2
start_at = time.perf_counter()
end_time = time.perf_counter() - start_at
logger.info(f"tool11111111 Time taken: {end_time:.20f} seconds")
2
3
4
# 13.python执行命令
#!/usr/local/env python3
# -*- coding: utf-8 -*-
import loguru
import os
import click
logger = loguru.logger
logger.add("packs-image.log", format="{time} {level} {message}", level="INFO")
@click.command()
@click.option("--image", help="需要打包的镜像名称")
def image_packs(image):
logger.info(f"需要打包的镜像是:{image}")
tar_name = image.replace(":", "_").split("/")[-1] + ".tar"
logger.info(f"打包后的镜像名称是:{tar_name}")
os.system(f"docker save -o {tar_name} {image}")
logger.info(f"{image} 打包完成")
logger.info(f"{tar_name}压缩镜像")
# 压缩镜像并删除原始镜像
os.system(f"tar -zcvf {tar_name}.gz {tar_name} --remove-files")
logger.info(f"压缩完成")
logger.info(f"镜像打包完成,打包文件是:{tar_name}.gz")
logger.info("上传镜像到OSS")
os.system(f"obsutil cp -f {tar_name}.gz obs://fastagi-web/5000E/一键部署/v2.0.0/")
logger.info(f"上传完成")
logger.info(f"下载链接: https://fastagi-web.obs.cn-southwest-2.myhuaweicloud.com/5000E/一键部署/v2.0.0/{tar_name}.gz")
if __name__ == "__main__":
image_packs()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 14.执行命令
import os
os.system("free -m")
2
02-路线规划 →