# 一.小工具
# 1.打开单个网页
import webbrowser
url = "https://ask.csdn.net/questions/8078459" # 将链接替换为你想要打开的实际链接
# 在默认浏览器中打开链接
webbrowser.open(url)
2
3
4
5
6
# 2.打开多个网页
import webbrowser
urls = [
"https://www.example1.com",
"https://www.example2.com",
"https://www.example3.com"
] # 用你想要打开的链接替换这些示例链接
# 循环打开每个链接
for url in urls:
webbrowser.open(url)
2
3
4
5
6
7
8
9
10
11
# 3.Notebook 中执行 Shell
# 方式一
!sh script.sh
# 方式二
import subprocess
subprocess.run(['sh', 'script.sh'])
# 方式三
%%bash
sh script.sh
2
3
4
5
6
7
8
9
10
# 4.日志详细打印
logger.error(f"QA error, question: {q}, error: {e}", exc_info=True)
# 5.全局日志
import logging
def set_debug_mode():
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
console_handler = logging.StreamHandler()
logger.addHandler(console_handler)
logger.setLevel(logging.DEBUG)
set_debug_mode()
2
3
4
5
6
7
8
9
10
11
12
# 6.执行 cmd 命令
if os.name == "nt":
logger.info("当前系统是 Windows")
# 执行 cmd 命令并获取输出
result = subprocess.run(['cmd', '/c', 'title 新空间代码软件开发工作室 Vx:15045666310'], capture_output=True,
text=True)
# 输出命令的执行结果
logger.info(result.stdout)
2
3
4
5
6
7
# 7.obsutil 安装
vim ~/.bash_profile
source ~/.bash_profile
vim ~/.zshrc
source ~/.zshrc
2
3
4
5
根据提供的错误信息,看起来是在执行脚本build_push.sh
的过程中,出现了obsutil
命令未找到的错误。您还提供了obsutil
实际所在的目录为/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12/obsutil
。
要解决这个问题,您可以采取以下几个步骤:
确认路径:首先,请确认
obsutil
工具的实际路径是否正确。您提供的路径为/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12/obsutil
,请确保这是obsutil
工具的可执行文件所在的路径。添加路径到环境变量:将
obsutil
所在的目录添加到系统的环境变量PATH
中,这样系统就能够在任何位置找到obsutil
命令。您可以通过编辑相应的配置文件来实现这一点,例如.bashrc
、.bash_profile
或.zshrc
(具体文件名取决于您使用的 Shell)。在文件中添加以下行:export PATH="/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12:$PATH"
1然后保存文件并重新启动终端或使用
source
命令使更改生效。测试命令:重新运行
build_push.sh
脚本,看看是否仍然报告obsutil
命令未找到的错误。如果一切正常,脚本应该能够找到并执行obsutil
命令。
请注意,以上步骤假设您提供的路径确实包含了可执行的obsutil
命令文件。如果路径不正确或文件不可执行,您可能需要重新下载或安装obsutil
工具,并确保将其正确配置到系统中。
如果问题仍然存在,建议查看脚本文件build_push.sh
的内容,确保在调用obsutil
命令之前没有其他错误或问题。
# 8.非异步调用异步方法
import asyncio
# 异步方法
async def async_function():
# 执行异步操作
await asyncio.sleep(1)
return "异步方法执行完成"
# 非异步方法
def sync_function():
# 调用异步方法并获取结果
result = asyncio.run(async_function())
# 打印结果
print(result)
# 调用非异步方法
sync_function()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 9.Dockerfile 项目(python)
# 使用官方的 Python 3.11.5 镜像作为基础镜像,并选择较小的 slim 版本
FROM python:3.11.5-slim
# 设置工作目录
WORKDIR /base-agent/
# 将 requirements.txt 复制到工作目录,并使用官方的清华镜像源安装依赖
COPY requirements.txt /base-agent/
RUN pip install --upgrade pip && \
pip install -r /base-agent/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
# 将应用代码复制到工作目录
COPY . /base-agent/
# 清理不需要的文件,减小镜像大小
RUN rm -rf /var/lib/apt/lists/* \
&& rm -rf /base-agent/__pycache__ \
&& rm -rf /base-agent/*.pyc \
&& rm -rf /base-agent/.cache \
&& rm -rf /base-agent/*.log
# 设置容器启动时执行的命令
ENTRYPOINT ["sh", "./run.sh"]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 二.集合操作
# 1.字典合并
有一个字典 base_config 和一个 yaml 文件,需要将 yaml 文件中存在但是 base_config 中不存在的字段添加到 base_config 中
extra_config:
- id: '1'
name: John
age: 25
- id: '2'
name: Jane
age: 30
- id: '3'
name: Alex
age: 28
2
3
4
5
6
7
8
9
10
import yaml
import json
filename = '0326/config.yaml'
with open(filename, "r", encoding="utf-8") as yaml_file:
config = yaml.safe_load(yaml_file)
# json_str = json.dumps(config)
base_config = {
'id': '1',
'name': 'test1220',
}
# 合并agent的配置,如果
if base_config is not None:
# 读取yaml的数据
extra_configs = config['extra_config']
if extra_configs is not None:
# 这里是字典
filtered_config = next(
(item_config for item_config in extra_configs if item_config['id'] == base_config['id']), None)
if filtered_config is not None:
for key, value in filtered_config.items():
if key not in base_config:
base_config[key] = value
json_str = json.dumps(base_config)
print(json_str)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 2.字符串长度
string = "方法进行确定。"
length = len(string)
print("字符串长度:", length)
2
3
4
urls = [
'11111111111'
,
'22222'
,
'33333'
]
# 循环打开每个链接
for url in urls:
print(len(url))
2
3
4
5
6
7
8
9
10
11
12
# 3.指定位置遍历
arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
start_index = -4
for element in arr[start_index:]:
# 执行遍历操作,对元素进行处理
print(element)
2
3
4
5
6
# 4.删除列表元素
# 使用pop删除
my_list = [1, 2, 3, 4, 5]
removed_element = my_list.pop(2)
print("被移除的元素:", removed_element)
print("列表剩余元素:", my_list)
# 使用切片删除
my_list = [1, 2, 3, 4, 5]
del my_list[:2]
print("删除后的列表:", my_list)
2
3
4
5
6
7
8
9
10
# 5.列表推导式
ids = [user_info.id for user_info in users] if users else []
# 6.字符串 '[22]' 转换为列表
要将字符串 '[22]'
转换为列表,您可以使用 eval()
函数或者使用 Python 内置的 JSON 模块进行转换。以下是两种方法的示例:
方法 1: 使用 eval()
函数
string = '[22]'
my_list = eval(string)
print(my_list)
2
3
方法 2: 使用 JSON 模块
import json
string = '[22]'
my_list = json.loads(string)
print(my_list)
2
3
4
5
# 7.字典字节串
# 创建字节串
byte_string = b'hello'
# 访问字节串中的字节
print(byte_string[0]) # 输出:104
# 将字节串解码为普通字符串
decoded_string = byte_string.decode('utf-8')
print(decoded_string) # 输出:hello
# 将普通字符串编码为字节串
encoded_string = 'world'.encode('utf-8')
print(encoded_string) # 输出:b'world'
# 字节串之间的拼接
concatenated_bytes = byte_string + encoded_string
print(concatenated_bytes) # 输出:b'helloworld'
# 字典字节字符串
dictionary = {b'relationship': Value(sVal=b'\xe5\x8c\x85\xe6\x8b\xac')}
sVal_value = dictionary[b'relationship'].sVal.decode('utf-8')
print(sVal_value)
# 字典字节字符串
dictionary = {b'relationship': Value(sVal=b'\xe5\x8c\x85\xe6\x8b\xac')}
sVal_value = dictionary[b'relationship'].sVal
print(sVal_value)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 8.列表推导式
首先,我们需要一个函数来获取配置文件中的特定键值:
def get_config_key(key):
# 假设config_data是从配置文件中加载的数据
config_data = {
"name": [
{"type": "web", "name": "Server1"},
{"type": "db", "name": "Database1"}
]
}
return config_data.get(key, [])
2
3
4
5
6
7
8
9
这个函数get_config_key
接收一个键名作为参数,并返回与该键名相关联的值。如果键不存在,它将返回一个空列表。
接下来,我们使用列表推导式来构建一个新的数据结构:
names = get_config_key("name")
config['name_server'] = [{item['type']: item['name']} for item in names]
2
在这段代码中,我们首先调用get_config_key
函数来获取所有名称条目。然后,我们使用列表推导式来创建一个新的列表,其中的每个元素都是一个字典,这个字典包含服务器的类型作为键,服务器的名称作为值。
# 三.文件操作
# 1.yaml 读取
在 Python 中,我们可以使用 PyYAML 库来读取和解析 YAML 数据。首先,我们需要安装 PyYAML 库,可以使用 pip 命令进行安装:
pip install pyyaml
安装完成后,我们可以使用以下代码读取 YAML 文件并解析其中的数据:
import yaml
filename = 'data.yaml'
try:
with open(filename, 'r', encoding="utf-8") as file:
yaml_data = yaml.safe_load(file)
print(yaml_data)
except FileNotFoundError:
print(f"File '{filename}' not found.")
except yaml.YAMLError as e:
print(f"Error while loading YAML: {e}")
except Exception as e:
print(f"An error occurred: {e}")
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml数据
person:
name: John Smith
age: 30
hobbies:
- reading
- hiking
2
3
4
5
6
访问方式:
print(yaml_data['person']['name']) # 输出:John Smith
print(yaml_data['person']['age']) # 输出:30
print(yaml_data['person']['hobbies']) # 输出:['reading', 'hiking']
2
3
# 2.使用方式
要在 YAML 文件中添加一个名为agent_config
的数组,每个数组元素都是一个对象,对象包含id
、name
和age
三个字段,可以使用以下示例:
agent_config:
- id: 1
name: John
age: 25
- id: 2
name: Jane
age: 30
- id: 3
name: Alex
age: 28
2
3
4
5
6
7
8
9
10
在这个示例中,agent_config
是数组的键名,后面的每个缩进的部分表示数组的元素。每个元素都是一个对象,使用键值对的形式表示,其中id
、name
和age
是字段名,后面是对应的值。
如果你想在 Python 代码中动态添加这样的数组数据并将其写入 YAML 文件,可以使用yaml
库提供的方法。以下是一个示例:
import yaml
data = {
'agent_config': [
{'id': 1, 'name': 'John', 'age': 25},
{'id': 2, 'name': 'Jane', 'age': 30},
{'id': 3, 'name': 'Alex', 'age': 28}
]
}
filename = 'config.yaml'
with open(filename, 'w', encoding='utf-8') as yaml_file:
yaml.dump(data, yaml_file)
2
3
4
5
6
7
8
9
10
11
12
13
在这个示例中,我们创建了一个字典data
,其中包含一个键为agent_config
的数组,数组的元素是包含id
、name
和age
字段的字典对象。然后,我们将data
字典写入到名为config.yaml
的 YAML 文件中。
# 3.json 数据写入 json 文件
import json
# 生成要写入的JSON数据
data = {
"name": "John",
"age": 30,
"city": "New York"
}
2
3
4
5
6
7
8
# 指定要写入的JSON文件路径
file_path = "path/to/your/file.json"
# 将JSON数据写入文件
with open(file_path, "w") as json_file:
json.dump(data, json_file)
2
3
4
5
6
# 4.读取目录下的文件
import os
def read_parquet_file(directory):
"""
读取指定目录下的所有.parquet文件
:param directory: 要搜索的目录路径
:return: 包含所有Parquet文件名的列表
"""
data = []
for root, dirs, files in os.walk(directory):
for file in files:
# 检查文件扩展名是否为.parquet
if file.endswith('.parquet'):
# 构建文件的完整路径
full_path = os.path.join(root, file)
# 读取Parquet文件,这里假设使用pandas
df = pd.read_parquet(full_path)
# 将DataFrame添加到数据列表中
data.append(df)
return data
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 5.Parquet 转 Excel
import pyarrow.parquet as pq
import pandas as pd
import os
def parquet2excel(file_name):
"""
将.parquet文件转换为Excel文件
:param file_name: Parquet文件的路径
:return: 无返回值,但会打印保存路径
"""
# 读取Parquet文件
parquet_file = pq.ParquetFile(file_name)
data = parquet_file.read().to_pandas()
# 将读取的数据转换为pandas DataFrame
df = pd.DataFrame(data)
# 构建Excel文件的保存路径
excel_path = file_name.replace('.parquet', '.xlsx')
# 确保保存路径的目录存在
os.makedirs(os.path.dirname(f'excel/{excel_path}'), exist_ok=True)
# 将DataFrame写入Excel文件
df.to_excel(f'excel/{excel_path}', index=False)
# 打印保存路径
print(f'数据已保存到 {excel_path}')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 6.excel 转 text 文件
import pandas as pd
def excel_insert_sql(file_path: str):
"""
根据excel内容生成insert语句
:param file_path: Excel文件的路径
:return: SQL插入语句字符串
"""
res = ''
# 如果Excel文件有多个sheet,可以通过sheet_name参数指定要读取的sheet
data = pd.read_excel(file_path, sheet_name='Sheet1')
for index, row in data.iterrows():
name = row['name']
name = name.replace('"', '')
description = row['description']
description = str(description).replace('"', '')
print(f'insert vertex entity values "{name}":("{description}");\n')
res += f'insert vertex entity values "{name}":("{description}");\n'
# 根据需要进行数据处理
# 例如,获取某一列的数据
# column_data = data['column_name']
return res
def save_to_text(insert_sql: str, file_path: str):
"""
保存SQL插入语句到文本文件中。
:param insert_sql: SQL插入语句字符串。
:param file_path: 要保存的文件路径,默认为当前目录下的'output.txt'。
:return: None
"""
with open(file_path, 'w', encoding='utf-8') as file:
file.write(insert_sql)
if __name__ == '__main__':
entities = 'create_final_entities'
file_path = f'excel/{entities}.xlsx'
res = excel_insert_sql(file_path=file_path)
save_to_text(insert_sql=res, file_path=f'txt/{entities}.txt')
print(f'数据处理完成')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 四.开发常用
# 1.全局参数
import json
# 读取配置文件
with open('config.json') as f:
config = json.load(f)
# 在全局字典中存储配置参数
global_params = config
def my_function():
# 在函数中使用全局配置参数
print(global_params['param1'])
my_function() # 输出配置文件中的参数值
2
3
4
5
6
7
8
9
10
11
12
13
14
# 2.变量+占位
def template_history(query: str):
content = ("历史消息:{history}\n"
f"{query}")
return content
2
3
4
# 3.受保护属性
在 Python 中,有两种约定用于指示某个属性或方法是受保护的,即不应该被外部直接访问或使用。这些约定是:
单下划线前缀(_): 在属性或方法的名称前加上单个下划线,例如
_protected_variable
或_protected_method()
。这个约定表示该属性或方法是受保护的,建议外部代码不要直接访问或使用它。虽然这只是一个约定,并没有真正限制外部代码的访问,但它向其他开发人员传达了一个警示,表明这是一个内部实现细节,可能会有变化。双下划线前缀(__): 在属性或方法的名称前加上双下划线,例如
__private_variable
或__private_method()
。这个约定表示该属性或方法是私有的,应该在类的内部使用,不应该被子类或外部代码直接访问。Python 使用名称修饰(name mangling)来对双下划线前缀进行变换,以避免与子类中相同名称的属性或方法冲突。例如,__private_variable
在类内部被转换为_ClassName__private_variable
。
在 Python 中,__dict__
是一个特殊属性,它是一个字典(dictionary),用于存储对象的属性和方法。该字典将对象的属性名作为键,对应的属性值作为值。
通过访问 __dict__
属性,您可以获取对象的属性和方法字典,从而查看、修改或删除对象的属性和方法。这使得您可以动态地操作对象的属性,而不需要事先知道属性的名称。
# 4.联合类型
使用|
进行类型提示(Python 3.10+)
def process_value(value: int | str) -> None:
if isinstance(value, int):
print(f"Integer: {value}")
elif isinstance(value, str):
print(f"String: {value}")
# 使用
process_value(10) # 输出: Integer: 10
process_value("Hello") # 输出: String: Hello
2
3
4
5
6
7
8
9
使用typing.Union
进行类型提示(Python 3.5 - 3.9)
from typing import Union
def process_value(value: Union[int, str]) -> None:
if isinstance(value, int):
print(f"Integer: {value}")
elif isinstance(value, str):
print(f"String: {value}")
2
3
4
5
6
7
# 5.多返回值
import numpy as np
def process_array(arr):
min_val = np.min(arr)
max_val = np.max(arr)
return min_val, max_val
# 创建一个二维数组
array = np.array([[1, 2, 3], [4, 5, 6]])
# 使用函数
min_val, max_val = process_array(array)
print(f"Min: {min_val}, Max: {max_val}")
2
3
4
5
6
7
8
9
10
11
12
13
# 6.Optional 使用
在 Python 中,Optional[str]
是一种类型提示,用于表示一个变量可以是 str
类型或 None
类型。这种类型提示通常用于函数参数或返回值,表示该参数或返回值可以为空。
下面是一个使用 Optional[str]
的示例:
def greet(name: Optional[str] = None) -> str:
if name:
return f"Hello, {name}!"
else:
return "Hello, stranger!"
# 调用函数
print(greet("Alice")) # 输出: "Hello, Alice!"
print(greet()) # 输出: "Hello, stranger!"
2
3
4
5
6
7
8
9
在这个例子中:
- 函数
greet
的参数name
被声明为Optional[str]
类型。这意味着它可以接受一个字符串类型的值,也可以接受None
值。 - 如果
name
参数被传入,函数将返回"Hello, {name}!"
字符串。 - 如果
name
参数为None
(即未传入值),函数将返回"Hello, stranger!"
字符串。
使用 Optional[str]
类型提示可以更好地理解代码的预期行为,并在编译时捕获一些潜在的错误。
# 7.提取 json 字符串
为了匹配第一个 {
和最后一个 }
之间的内容,包括 {}
本身,我们可以使用以下正则表达式:
import re
answer_ = """-----key:answer, value:```json
{
"question1": "鲁迅是什么朝代的?",
"question2": "鲁迅是谁?",
"question3": "鲁迅原名是?"
}
```"""
# 正则表达式匹配第一个{和最后一个}之间的内容
pattern = r'\{.*?\}'
match = re.search(pattern, answer_, re.DOTALL)
if match:
result = match.group(0).strip() # 使用strip()去除可能的前后空白字符
else:
result = "没有找到匹配的内容"
print(result)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这个正则表达式 \{.*?\}
会匹配第一个 {
到最后一个 }
之间的内容。re.DOTALL
标志允许 .
匹配包括换行符在内的任何字符。
# 8.装饰器
下面是一个简单的日志记录装饰器示例:
import functools
import time
def log_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(f"Calling {func.__name__}")
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper
@log_decorator
def add(a, b):
time.sleep(1) # 模拟耗时操作
return a + b
result = add(5, 3)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 9.关键字
1. Python 关键字简介
Python 关键字是一些保留的词汇,它们在 Python 中有特殊的含义,用于定义程序的结构和流程。根据 Python 官方文档,截至 Python 3.10,共有 35 个关键字:
False
,None
,True
,and
,as
,assert
,async
,await
,break
,class
,continue
,def
,del
,elif
,else
,except
,finally
,for
,from
,global
,if
,import
,in
,is
,lambda
,nonlocal
,not
,or
,pass
,raise
,return
,try
,while
,with
,yield
这些关键字覆盖了从条件判断到循环控制,从异常处理到类和函数定义等多个方面。
2. 关键字的分类
Python 关键字可以根据它们的用途进行分类:
- 逻辑控制关键字:
and
,not
,or
,if
,elif
,else
,for
,while
- 定义和声明关键字:
class
,def
,global
,nonlocal
,lambda
- 异常处理关键字:
try
,except
,finally
,raise
- 模块和包处理关键字:
import
,from
,as
- 赋值和迭代关键字:
in
,is
,return
,yield
- 其他关键字:
False
,None
,True
,assert
,async
,await
,break
,continue
,del
,pass
,with
3. 关键字的用法
每个关键字都有其特定的用法和含义:
if
,elif
,else
:用于条件判断。if x > 0: print("Positive") elif x == 0: print("Zero") else: print("Negative")
1
2
3
4
5
6for
,while
:用于循环控制。for i in range(5): print(i) while x < 10: x += 1
1
2
3
4
5class
,def
:用于定义类和函数。class MyClass: pass def my_function(): pass
1
2
3
4
5try
,except
,finally
:用于异常处理。try: # 尝试执行的代码 except Exception as e: # 处理异常 finally: # 无论是否发生异常都会执行的代码
1
2
3
4
5
6import
,from
,as
:用于导入模块和包。import os from sys import argv import numpy as np
1
2
3return
,yield
:用于从函数返回值或生成器产生值。def my_function(): return "Hello, World!" def my_generator(): yield 1 yield 2
1
2
3
4
5
6
# 10.时间戳比较
在 Python 中,可以使用datetime.fromtimestamp()
方法将时间戳转换为datetime
对象。如果时间戳是毫秒级,需要将其除以 1000 转换为秒。以下是转换的示例代码:
from datetime import datetime
# 假设的时间戳(毫秒级)
create_time_timestamp = 1716548680000
# 将毫秒级时间戳转换为秒
create_time_seconds = create_time_timestamp / 1000
# 转换为datetime对象
create_time = datetime.fromtimestamp(create_time_seconds)
# 指定的日期
compare_date = datetime(2024, 1, 1)
# 比较时间
if create_time > compare_date:
print("createTime is greater than 2024-01-01")
else:
print("createTime is not greater than 2024-01-01")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 11.获取周一
要编写一个 Python 程序,生成从 2024 年 1 月 1 日到今天(2024 年 6 月 1 日)的所有周一的日期,并打开每个日期对应的网页链接,你可以使用以下代码:
import webbrowser
from datetime import datetime, timedelta
def generate_mondays(start_date, end_date):
# 初始化一个空列表来存储周一的日期
mondays = []
# 从开始日期开始循环,直到结束日期
current_date = start_date
while current_date <= end_date:
if current_date.weekday() == 0: # weekday() 返回0代表周一
mondays.append(current_date)
current_date += timedelta(days=1)
return mondays
def open_webpages(mondays):
# 为每个周一生成URL并打开网页
base_url = "https://mp.csdn.net/poster/"
for monday in mondays:
url = base_url + monday.strftime("%Y%m%d")
webbrowser.open_new(url) # 使用 open_new 来在新标签页中打开网页
# 设置起始日期为2024年1月1日
start_date = datetime(2024, 1, 1)
# 设置结束日期为今天
end_date = datetime(2024, 6, 1)
# 生成所有周一的日期列表
mondays = generate_mondays(start_date, end_date)
# 打开每个周一的网页
open_webpages(mondays)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
这段代码首先定义了一个函数generate_mondays
来计算指定范围内的所有周一的日期,然后定义了open_webpages
函数来为每个周一生成 URL 并打开网页。最后,它设置了起始日期和结束日期,并调用这些函数。
请注意,这段代码会依次打开从 2024 年 1 月 1 日到 2024 年 6 月 1 日之间所有周一的网页。如果你希望同时打开所有网页,可以修改webbrowser.open_new(url)
为webbrowser.open(url)
,但这样可能会导致浏览器标签页过多,使用户难以管理。使用open_new
可以让用户在浏览器中逐个查看每个标签页。
# 12.列表推导式
基本语法:
列表推导式的基本语法结构如下:
[expression for item in iterable if condition]
expression
:新列表中的元素表达式。item
:从可迭代对象iterable
中迭代得到的元素。condition
:一个可选的条件表达式,用于过滤满足条件的元素。
简单示例:
让我们从一个简单的例子开始,假设我们有一个数字列表,我们想要创建一个新的列表,其中只包含原列表中的偶数:
original_list = [1, 2, 3, 4, 5, 6]
even_numbers = [num for num in original_list if num % 2 == 0]
print(even_numbers) # 输出: [2, 4, 6]
循环比较:
2
3
4
列表推导式与循环的比较
使用列表推导式与使用传统的 for 循环相比,代码更加简洁,可读性更强。以下是使用 for 循环实现相同功能的代码:
original_list = [1, 2, 3, 4, 5, 6]
even_numbers = []
for num in original_list:
if num % 2 == 0:
even_numbers.append(num)
print(even_numbers) # 输出: [2, 4, 6]
嵌套列表推导式:
2
3
4
5
6
7
列表推导式可以嵌套使用,用于处理更复杂的数据结构,如二维数组的转置:
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
transposed = [list(row) for row in zip(*matrix)]
print(transposed) # 输出: [[1, 4, 7], [2, 5, 8], [3, 6, 9]]
包含多个 for 子句的列表推导式:
2
3
4
列表推导式可以包含多个 for 子句,用于笛卡尔积等操作:
from itertools import product
x = [1, 2, 3]
y = ['a', 'b']
pairs = [(x, y) for x in x for y in y]
print(pairs) # 输出: [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b'), (3, 'a'), (3, 'b')]
使用字典推导式:
2
3
4
5
6
列表推导式不仅可以用于列表,还可以用于创建字典:
keys = ['a', 'b', 'c']
values = [1, 2, 3]
my_dict = {k: v for k, v in zip(keys, values)}
print(my_dict) # 输出: {'a': 1, 'b': 2, 'c': 3}
2
3
4
# 13.字典遍历
在 Python 中,遍历字典({}
)有几种不同的方法,主要取决于你想要遍历字典的键(keys)、值(values)还是键值对(items)。
# 1.遍历所有键
使用.keys()
方法可以获取字典中所有的键,然后通过循环遍历这些键。
my_dict = {'a': 1, 'b': 2, 'c': 3}
for key in my_dict.keys():
print(key)
2
3
或者更简洁的方式是直接在字典对象上使用for
循环:
for key in my_dict:
print(key)
2
# 2.遍历所有值
使用.values()
方法可以获取字典中所有的值,然后通过循环遍历这些值。
for value in my_dict.values():
print(value)
2
# 3.遍历键值对
使用.items()
方法可以获取字典中的所有键值对,返回一个包含(key, value)
元组的视图对象。
for key, value in my_dict.items():
print(key, value)
2
# 4.使用列表推导式
如果你需要从字典中提取信息并创建列表,可以使用列表推导式。
遍历键:
keys_list = [key for key in my_dict]
1遍历值:
values_list = [value for value in my_dict.values()]
1遍历键值对:
items_list = [(key, value) for key, value in my_dict.items()]
1
# 5.使用字典推导式
如果你需要基于现有字典创建一个新的字典,可以使用字典推导式。
new_dict = {key: value * 2 for key, value in my_dict.items()}
# 6.使用 next()函数
如果你需要访问字典的第一个键值对,可以使用next()
函数。
first_item = next(iter(my_dict.items()), None)
# 7.使用 get()方法
如果你需要安全地访问字典中的值,可以使用get()
方法,它允许你指定默认值。
value = my_dict.get('key', 'default_value')
# 14.数字的布尔值
0 的 bool 值:
if 0:
print(True)
else:
print(False)
2
3
4
5
返回 False
-1 的 bool 值:
if -1:
print(True)
else:
print(False)
2
3
4
5
返回 True
# 15. 第三方库 simplejson
simplejson
是一个第三方库,它是 Python 内置json
模块的一个分支,提供了一些额外的功能和性能优化。以下是simplejson
的一些特点:
- 性能:在某些场景下,
simplejson
可能比内置的json
模块有更好的性能表现,尤其是在处理大量数据时。 - 额外的序列化选项:
simplejson
提供了一些额外的序列化选项,如use_decimal
,允许使用Decimal
类型而不是默认的float
。 - 更好的错误信息:在发生错误时,
simplejson
可能会提供更详细的错误信息,有助于调试。 - 兼容性:
simplejson
在某些情况下提供了更好的兼容性,尤其是在处理一些边缘 JSON 格式时。
# 五.缓存设计
# 1. 环境准备
在 Python 中实现缓存,我们可能会用到标准库中的functools.lru_cache
装饰器,或者使用第三方库如cachetools
。以下是安装cachetools
的方法:
pip install cachetools
# 2. 使用functools.lru_cache
Python 标准库中的functools
模块提供了一个非常有用的装饰器lru_cache
,它可以实现最近最少使用(Least Recently Used,LRU)缓存。这意味着它会缓存最近调用的函数的结果,当缓存满了之后,会淘汰掉最久未被使用的缓存项。
以下是一个使用lru_cache
的示例:
import functools
@functools.lru_cache(maxsize=128)
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 计算斐波那契数列的第10项
print(fibonacci(10))
2
3
4
5
6
7
8
9
10
在这个示例中,fibonacci
函数的结果会被缓存,当再次请求相同的参数时,会直接从缓存中获取结果,而不是重新计算。
# 3. 使用cachetools
cachetools
是一个提供多种缓存策略的第三方库。它支持 LRU、LFU(Least Frequently Used,最少使用频率)和 RR(Random Replacement,随机替换)等策略。
以下是一个使用cachetools
实现 LRU 缓存的示例:
from cachetools import LRUCache
def expensive_function(x):
# 模拟一个耗时的操作
return x * x
cache = LRUCache(maxsize=100)
def cached_expensive_function(x):
if x not in cache:
cache[x] = expensive_function(x)
return cache[x]
# 使用缓存的函数
print(cached_expensive_function(4))
print(cached_expensive_function(4)) # 第二次调用将使用缓存的结果
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在这个示例中,我们创建了一个LRUCache
对象,并使用它来缓存expensive_function
函数的结果。
# 4. 文件系统缓存
在某些情况下,我们可能需要将缓存数据持久化到文件系统中。这可以通过将数据序列化并存储到文件中来实现。以下是一个简单的文件系统缓存示例:
import json
import os
CACHE_FILE = 'cache.json'
def load_cache():
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, 'r') as f:
return json.load(f)
return {}
def save_cache(cache):
with open(CACHE_FILE, 'w') as f:
json.dump(cache, f)
cache = load_cache()
def get_data(key):
if key in cache:
return cache[key]
else:
# 模拟获取数据的过程
data = f"data for {key}"
cache[key] = data
save_cache(cache)
return data
# 使用文件系统缓存
print(get_data("key1"))
print(get_data("key1")) # 第二次调用将使用缓存的数据
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
在这个示例中,我们使用 JSON 文件作为缓存存储,并在需要时加载和保存缓存数据。
# 5.excel 行数
from openpyxl import load_workbook
# 指定Excel文件路径
excel_path = 'test_01.xlsx'
# 加载工作簿
workbook = load_workbook(filename=excel_path)
# 选择活动工作表,默认是第一个工作表
sheet = workbook.active
# 获取工作表的行数
row_count = sheet.max_row
print(f'Excel file has {row_count} rows.')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 6.判断类型
if vocab_table_ids is not None and isinstance(vocab_table_ids, str):
# 这里是当 vocab_table_ids 是字符串时执行的代码
2
# 7.pathlib 使用
现在,让我们回到最初的问题,深入分析以下代码片段:
search_path = Path(self._root_dir) / (base_dir or "")
这行代码是pathlib
模块的一个典型用法,它展示了如何使用Path
对象和条件表达式来构建一个搜索路径。下面是对这行代码的逐部分解释:
Path(self._root_dir)
:这里创建了一个Path
对象,其路径由实例变量self._root_dir
指定。这个变量代表了类的实例所关联的根目录路径。/
:这是Path
对象的路径连接运算符,用于将self._root_dir
与base_dir
连接起来。(base_dir or "")
:这是一个 Python 的条件表达式,用于确定要连接的第二个路径部分。如果base_dir
是None
或者任何被认为是False
的值(如空字符串、0 等),则表达式的结果将是一个空字符串。这样,即使base_dir
未定义或为None
,代码也不会抛出错误。
通过这种方式,search_path
变量将存储一个由self._root_dir
和base_dir
(如果提供了的话)组合而成的新路径。如果base_dir
是None
或未提供,search_path
将只包含self._root_dir
的值。
# 8.异步请求
在 Python 里如果用异步函数,就里里外外都需要使用异步函数。否则在异步函数里面使用 time.sleep 这种会阻塞整个事件循环。
模拟网络请求的关系:
time.sleep <===> requests
async.sleep <===> aiohttp
我新增的几个投放、更新卡片的接口是用 aiohttp 支持的
# 异步sleep
await asyncio.sleep(random())
# 同步sleep
time.sleep(random())
2
3
4
5
下面是一个使用aiohttp
进行异步 HTTP 请求的例子:
import aiohttp
import asyncio
async def fetch(url, session):
async with session.get(url) as response:
return await response.text()
async def main():
url = 'http://example.com'
async with aiohttp.ClientSession() as session:
html = await fetch(url, session)
print(html)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 9.查看包的大小
du -mh /Users/qinyingjie/miniconda3/envs/llm-graph-builder/lib/python3.11/site-packages/transformers
# 10.并发请求
# 并发请求
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
futures = []
with ThreadPoolExecutor(max_workers=10) as executor:
for chunk in combined_chunk_document_list:
chunk_doc = Document(
page_content=chunk.page_content.encode("utf-8"), metadata=chunk.metadata
)
futures.append(
executor.submit(llm_transformer.convert_to_graph_documents, [chunk_doc])
)
for i, future in enumerate(concurrent.futures.as_completed(futures)):
graph_document = future.result()
graph_document_list.append(graph_document[0])
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 11.判断代码
enable_user_agent = os.environ.get("ENABLE_USER_AGENT", "False").lower() in ("true", "1", "yes")
# 12.执行 shell 命令
在 Python 中,你可以使用几种方法来执行 shell 命令。以下是一些常用的方法:
os.system()
: 这是最简单的方法,它执行一个指定的命令字符串。返回值是命令的退出状态码。import os result = os.system('ls -l') print("Exit status:", result)
1
2
3subprocess.run()
(推荐方式):subprocess
模块提供了更多的灵活性和强大的功能来执行和管理子进程。import subprocess result = subprocess.run(['ls', '-l'], capture_output=True, text=True) print("Output:", result.stdout)
1
2
3subprocess.Popen()
: 如果你需要更细粒度的控制,可以使用Popen
。这个方法允许你执行一个命令并与之交互。import subprocess process = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = process.communicate() print("Output:", stdout) if process.returncode != 0: print("Error:", stderr)
1
2
3
4
5
6os.popen()
: 这个方法可以用来执行命令并读取输出。with os.popen('ls -l') as stream: print(stream.read())
1
2subprocess.check_output()
: 这个方法用于执行命令并获取输出,如果命令返回非零退出状态码,会抛出异常。from subprocess import check_output output = check_output(['ls', '-l'], text=True) print("Output:", output)
1
2
3subprocess.call()
: 这个方法执行一个命令并等待其完成,返回命令的退出状态码。import subprocess status = subprocess.call(['ls', '-l']) print("Exit status:", status)
1
2
3
使用这些方法时,记得处理异常和错误,特别是当命令失败时。subprocess
模块是执行 shell 命令的首选方式,因为它提供了更多的控制和灵活性。
# 13.中位数的计算方法
import numpy as np
# 假设我们有一组数据
data = np.array([1, 3, 5, 7, 9])
# 使用 np.median 计算中位数
median_value = np.median(data)
print("中位数是:", median_value)
2
3
4
5
6
7
8
9
# 14.返回 2 个值
class MyClass:
def my_method(self, x: int, y: int) -> tuple[int, int]:
# 这里进行一些操作
result1 = x + y
result2 = x * y
# 返回一个整数元组
return result1, result2
# 创建 MyClass 的实例
my_instance = MyClass()
# 使用实例调用 my_method 方法
result1, result2 = my_instance.my_method(3, 4)
# 打印结果
print(result1) # 输出: 7
print(result2) # 输出: 12
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 15.获取耗时
from datetime import datetime
import time
# 获取当前时间戳并转换为datetime对象
start_time = datetime.fromtimestamp(time.time())
# 程序暂停5秒
time.sleep(5)
# 再次获取当前时间戳并转换为datetime对象
end_time = datetime.fromtimestamp(time.time())
# 计算开始和结束时间的时间差
cost_time = (end_time - start_time)
# 将时间差转换为时分秒格式
hours, remainder = divmod(cost_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
# 格式化输出时分秒
formatted_time = f"{hours:02}:{minutes:02}:{seconds:02}"
print(formatted_time)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
00:00:05
# 16.方法调用
在 Python 中,方法通常是用来对对象进行操作的函数。方法可以带有参数,也可以不带参数。当你调用一个方法时,是否使用括号取决于你想要执行的操作。
不带括号:当你引用一个方法而不使用括号时,你实际上是在引用这个方法本身,而不是调用它。这通常用于获取方法的引用,或者在某些情况下,你想要将方法作为参数传递给其他函数。
带括号:当你调用一个方法时,你应该使用括号,即使方法不接受任何参数。括号表示你想要执行这个方法。如果方法需要参数,你还需要在括号内提供这些参数。
例如,假设有一个类UserManager
,它有一个方法get_current_user
:
class UserManager:
def get_current_user(self):
# 这里是获取当前用户的逻辑
return "Current User"
2
3
4
不带括号:
user_manager.get_current_user
这将返回方法对象本身,而不是方法的返回值。带括号:
user_manager.get_current_user()
这将调用get_current_user
方法,并返回它返回的值,例如 "Current User"。
如果你尝试调用一个不接受参数的方法而不使用括号,Python 解释器会抛出一个TypeError
,因为它期望你在调用方法时使用括号。例如:
user_manager = UserManager()
current_user = user_manager.get_current_user() # 正确
# current_user = user_manager.get_current_user # 错误,会引发TypeError
2
3
总结来说,带括号是调用方法的标准方式,而不带括号通常用于引用方法本身。在 Python 中,即使方法不接受任何参数,调用时也应该使用括号。
# 17.处理parqut文件
import pandas as pd
# path = '/Users/qinyingjie/Documents/python-workspace/graphrag-server/ragtest/output/20240923-100846/artifacts/create_final_entities.parquet'
path = '/Users/qinyingjie/Documents/python-workspace/graphrag-server/ragtest/output/20240923-100846/artifacts/create_final_entities.parquet'
df = pd.read_parquet(path)
# 统计行数
num_rows = len(df)
print(f"文件 {path} 包含 {num_rows} 行。")
# 检查是否存在 'weight' 字段
if 'description_embedding' in df.columns:
description_embedding = df['description_embedding'][0]
embedding_dim = len(description_embedding)
print(embedding_dim)
print(f"The embedding dimension is: {embedding_dim}")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 18.获取异常信息
try:
return SearchResult(
response=response,
context_data=context_records,
context_text=context_text,
completion_time=time.time() - start_time,
llm_calls=1,
prompt_tokens=num_tokens(search_prompt, self.token_encoder),
)
except Exception as e:
log.exception("Exception in _map_response_single_batch")
error_message = str(e) # Capture the exception message
return SearchResult(
response=f"Error: {error_message}", # Store the error message in the response
context_data=context_records,
context_text=context_text,
completion_time=time.time() - start_time,
llm_calls=1,
prompt_tokens=num_tokens(search_prompt, self.token_encoder),
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 19.接口下载json文件
data_ = result['data']
orig_name = data_['orig_name']
json_data = json.dumps(data_)
safe_filename = urllib.parse.quote(orig_name + ".json")
headers = {
"Content-Disposition": f'attachment; filename="{safe_filename}"; filename*=UTF-8\'{safe_filename}'
}
return http_response(
content=json_data,
media_type="application/json",
headers=headers
)
2
3
4
5
6
7
8
9
10
11
12
# 一.Python工具包
# 1.打包
sh build_push.sh 0.0.1.202404131548
# 2.预约明天
pip install /Users/qinyingjie/Documents/python-workspace/kwan_tools/dist/kwan_tools-0.0.1.202404131548-py3-none-any.whl
# 3.场地预约
order-start config.yaml
# 1.枚举的使用
from enum import Enum
class TaskStatus(Enum):
# 枚举成员名称通常大写
RUNNING = "运行中"
TERMINATED = "已终止"
SUCCESS = "成功"
FAILURE = "失败"
# 使用枚举
task = TaskStatus.RUNNING
print(task) # 输出: TaskStatus.RUNNING
print(task.name) # 输出: RUNNING
print(task.value) # 输出: 运行中
2
3
4
5
6
7
8
9
10
11
12
13
14
# 2. 异步任务终止
from typing import Any
from manager.graph.manager import GraphManager
from manager.schemas.manager import SchemasManager
from utils import logger
from fastapi import APIRouter
from manager.graph.query_graph import LocalQueryGraph
import time
import asyncio
GraphRouter = APIRouter(prefix="/test", tags=["测试管理"])
global task_map
task_map: dict[Any, Any] = {}
class GraphRouterMap:
# 长时间运行的异步任务
@staticmethod
async def long_running_task():
try:
while True:
# 模拟任务运行
print("长时间运行的任务正在运行.....")
await asyncio.sleep(2)
except asyncio.CancelledError:
print("任务被取消")
@staticmethod
@GraphRouter.get("/start", summary="启动任务", description="启动任务")
async def start_task():
# 启动长时间任务,并返回任务对象
run_id = time.strftime("%Y%m%d-%H%M%S")
task = asyncio.create_task(GraphRouterMap.long_running_task())
task_map[run_id] = task
return run_id
@staticmethod
@GraphRouter.get("/stop", summary="中止任务", description="中止任务")
async def stop_task(run_id: str):
task = task_map.get(run_id)
task.cancel()
return {"message": "任务取消请求已发送"}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 3.详细日志
在 Python 中打印详细的堆栈信息,通常可以通过以下方法实现:
使用traceback
模块:这是 Python 标准库中的一个模块,专门用于打印堆栈信息。当你遇到异常时,可以使用traceback.print_exc()
来打印异常信息和堆栈跟踪。例如:
import traceback
try:
# 你的代码逻辑
# 可能会引发异常的代码
except Exception as e:
traceback.print_exc()
2
3
4
5
6
7
02-路线规划 →