# 一.小工具

# 1.打开单个网页

import webbrowser

url = "https://ask.csdn.net/questions/8078459"  # 将链接替换为你想要打开的实际链接

# 在默认浏览器中打开链接
webbrowser.open(url)
1
2
3
4
5
6

# 2.打开多个网页

import webbrowser

urls = [
    "https://www.example1.com",
    "https://www.example2.com",
    "https://www.example3.com"
]  # 用你想要打开的链接替换这些示例链接

# 循环打开每个链接
for url in urls:
    webbrowser.open(url)
1
2
3
4
5
6
7
8
9
10
11

# 3.Notebook 中执行 Shell

# 方式一
!sh script.sh

# 方式二
import subprocess
subprocess.run(['sh', 'script.sh'])

# 方式三
%%bash
sh script.sh
1
2
3
4
5
6
7
8
9
10

# 4.日志详细打印

logger.error(f"QA error, question: {q}, error: {e}", exc_info=True)
1

# 5.全局日志

import logging
def set_debug_mode():
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger()

    console_handler = logging.StreamHandler()
    logger.addHandler(console_handler)

    logger.setLevel(logging.DEBUG)

set_debug_mode()
1
2
3
4
5
6
7
8
9
10
11
12

# 6.执行 cmd 命令

if os.name == "nt":
    logger.info("当前系统是 Windows")
    # 执行 cmd 命令并获取输出
    result = subprocess.run(['cmd', '/c', 'title 新空间代码软件开发工作室 Vx:15045666310'], capture_output=True,
                            text=True)
    # 输出命令的执行结果
    logger.info(result.stdout)
1
2
3
4
5
6
7

# 7.obsutil 安装

vim ~/.bash_profile
source ~/.bash_profile

vim ~/.zshrc
source ~/.zshrc
1
2
3
4
5

根据提供的错误信息,看起来是在执行脚本build_push.sh的过程中,出现了obsutil命令未找到的错误。您还提供了obsutil实际所在的目录为/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12/obsutil

要解决这个问题,您可以采取以下几个步骤:

  1. 确认路径:首先,请确认obsutil工具的实际路径是否正确。您提供的路径为/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12/obsutil,请确保这是obsutil工具的可执行文件所在的路径。

  2. 添加路径到环境变量:将obsutil所在的目录添加到系统的环境变量PATH中,这样系统就能够在任何位置找到obsutil命令。您可以通过编辑相应的配置文件来实现这一点,例如.bashrc.bash_profile.zshrc(具体文件名取决于您使用的 Shell)。在文件中添加以下行:

    export PATH="/Users/qinyingjie/Documents/software/obsutil_darwin_amd64_5.5.12:$PATH"
    
    1

    然后保存文件并重新启动终端或使用source命令使更改生效。

  3. 测试命令:重新运行build_push.sh脚本,看看是否仍然报告obsutil命令未找到的错误。如果一切正常,脚本应该能够找到并执行obsutil命令。

请注意,以上步骤假设您提供的路径确实包含了可执行的obsutil命令文件。如果路径不正确或文件不可执行,您可能需要重新下载或安装obsutil工具,并确保将其正确配置到系统中。

如果问题仍然存在,建议查看脚本文件build_push.sh的内容,确保在调用obsutil命令之前没有其他错误或问题。

# 8.非异步调用异步方法

import asyncio

# 异步方法
async def async_function():
    # 执行异步操作
    await asyncio.sleep(1)
    return "异步方法执行完成"

# 非异步方法
def sync_function():
    # 调用异步方法并获取结果
    result = asyncio.run(async_function())

    # 打印结果
    print(result)

# 调用非异步方法
sync_function()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 9.Dockerfile 项目(python)

# 使用官方的 Python 3.11.5 镜像作为基础镜像,并选择较小的 slim 版本
FROM python:3.11.5-slim

# 设置工作目录
WORKDIR /base-agent/

# 将 requirements.txt 复制到工作目录,并使用官方的清华镜像源安装依赖
COPY requirements.txt /base-agent/
RUN pip install --upgrade pip && \
    pip install -r /base-agent/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

# 将应用代码复制到工作目录
COPY . /base-agent/

# 清理不需要的文件,减小镜像大小
RUN rm -rf /var/lib/apt/lists/* \
    && rm -rf /base-agent/__pycache__ \
    && rm -rf /base-agent/*.pyc \
    && rm -rf /base-agent/.cache \
    && rm -rf /base-agent/*.log

# 设置容器启动时执行的命令
ENTRYPOINT ["sh", "./run.sh"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 二.集合操作

# 1.字典合并

有一个字典 base_config 和一个 yaml 文件,需要将 yaml 文件中存在但是 base_config 中不存在的字段添加到 base_config 中

extra_config:
  - id: '1'
    name: John
    age: 25
  - id: '2'
    name: Jane
    age: 30
  - id: '3'
    name: Alex
    age: 28
1
2
3
4
5
6
7
8
9
10
import yaml
import json

filename = '0326/config.yaml'
with open(filename, "r", encoding="utf-8") as yaml_file:
    config = yaml.safe_load(yaml_file)
# json_str = json.dumps(config)

base_config = {
    'id': '1',
    'name': 'test1220',
}

# 合并agent的配置,如果
if base_config is not None:
    # 读取yaml的数据
    extra_configs = config['extra_config']
    if extra_configs is not None:
        # 这里是字典
        filtered_config = next(
            (item_config for item_config in extra_configs if item_config['id'] == base_config['id']), None)
        if filtered_config is not None:
            for key, value in filtered_config.items():
                if key not in base_config:
                    base_config[key] = value
    json_str = json.dumps(base_config)
    print(json_str)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 2.字符串长度

string = "方法进行确定。"
length = len(string)
print("字符串长度:", length)

1
2
3
4
urls = [
    '11111111111'
    ,
    '22222'
    ,
    '33333'
]

# 循环打开每个链接
for url in urls:
    print(len(url))

1
2
3
4
5
6
7
8
9
10
11
12

# 3.指定位置遍历

arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
start_index = -4

for element in arr[start_index:]:
    # 执行遍历操作,对元素进行处理
    print(element)
1
2
3
4
5
6

# 4.删除列表元素

# 使用pop删除
my_list = [1, 2, 3, 4, 5]
removed_element = my_list.pop(2)
print("被移除的元素:", removed_element)
print("列表剩余元素:", my_list)

# 使用切片删除
my_list = [1, 2, 3, 4, 5]
del my_list[:2]
print("删除后的列表:", my_list)
1
2
3
4
5
6
7
8
9
10

# 5.列表推导式

ids = [user_info.id for user_info in users] if users else []
1

# 6.字符串 '[22]' 转换为列表

要将字符串 '[22]' 转换为列表,您可以使用 eval() 函数或者使用 Python 内置的 JSON 模块进行转换。以下是两种方法的示例:

方法 1: 使用 eval() 函数

string = '[22]'
my_list = eval(string)
print(my_list)
1
2
3

方法 2: 使用 JSON 模块

import json

string = '[22]'
my_list = json.loads(string)
print(my_list)
1
2
3
4
5

# 7.字典字节串

# 创建字节串
byte_string = b'hello'

# 访问字节串中的字节
print(byte_string[0])  # 输出:104

# 将字节串解码为普通字符串
decoded_string = byte_string.decode('utf-8')
print(decoded_string)  # 输出:hello

# 将普通字符串编码为字节串
encoded_string = 'world'.encode('utf-8')
print(encoded_string)  # 输出:b'world'

# 字节串之间的拼接
concatenated_bytes = byte_string + encoded_string
print(concatenated_bytes)  # 输出:b'helloworld'

# 字典字节字符串
dictionary = {b'relationship': Value(sVal=b'\xe5\x8c\x85\xe6\x8b\xac')}
sVal_value = dictionary[b'relationship'].sVal.decode('utf-8')
print(sVal_value)

# 字典字节字符串
dictionary = {b'relationship': Value(sVal=b'\xe5\x8c\x85\xe6\x8b\xac')}
sVal_value = dictionary[b'relationship'].sVal
print(sVal_value)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 8.列表推导式

首先,我们需要一个函数来获取配置文件中的特定键值:

def get_config_key(key):
    # 假设config_data是从配置文件中加载的数据
    config_data = {
        "name": [
            {"type": "web", "name": "Server1"},
            {"type": "db", "name": "Database1"}
        ]
    }
    return config_data.get(key, [])
1
2
3
4
5
6
7
8
9

这个函数get_config_key接收一个键名作为参数,并返回与该键名相关联的值。如果键不存在,它将返回一个空列表。

接下来,我们使用列表推导式来构建一个新的数据结构:

names = get_config_key("name")
config['name_server'] = [{item['type']: item['name']} for item in names]
1
2

在这段代码中,我们首先调用get_config_key函数来获取所有名称条目。然后,我们使用列表推导式来创建一个新的列表,其中的每个元素都是一个字典,这个字典包含服务器的类型作为键,服务器的名称作为值。

# 三.文件操作

# 1.yaml 读取

在 Python 中,我们可以使用 PyYAML 库来读取和解析 YAML 数据。首先,我们需要安装 PyYAML 库,可以使用 pip 命令进行安装:

pip install pyyaml
1

安装完成后,我们可以使用以下代码读取 YAML 文件并解析其中的数据:

import yaml

filename = 'data.yaml'

try:
    with open(filename, 'r', encoding="utf-8") as file:
        yaml_data = yaml.safe_load(file)
        print(yaml_data)
except FileNotFoundError:
    print(f"File '{filename}' not found.")
except yaml.YAMLError as e:
    print(f"Error while loading YAML: {e}")
except Exception as e:
    print(f"An error occurred: {e}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14

yaml数据

person:
  name: John Smith
  age: 30
  hobbies:
    - reading
    - hiking
1
2
3
4
5
6

访问方式:

print(yaml_data['person']['name'])  # 输出:John Smith
print(yaml_data['person']['age'])  # 输出:30
print(yaml_data['person']['hobbies'])  # 输出:['reading', 'hiking']
1
2
3

# 2.使用方式

要在 YAML 文件中添加一个名为agent_config的数组,每个数组元素都是一个对象,对象包含idnameage三个字段,可以使用以下示例:

agent_config:
  - id: 1
    name: John
    age: 25
  - id: 2
    name: Jane
    age: 30
  - id: 3
    name: Alex
    age: 28
1
2
3
4
5
6
7
8
9
10

在这个示例中,agent_config是数组的键名,后面的每个缩进的部分表示数组的元素。每个元素都是一个对象,使用键值对的形式表示,其中idnameage是字段名,后面是对应的值。

如果你想在 Python 代码中动态添加这样的数组数据并将其写入 YAML 文件,可以使用yaml库提供的方法。以下是一个示例:

import yaml

data = {
    'agent_config': [
        {'id': 1, 'name': 'John', 'age': 25},
        {'id': 2, 'name': 'Jane', 'age': 30},
        {'id': 3, 'name': 'Alex', 'age': 28}
    ]
}

filename = 'config.yaml'
with open(filename, 'w', encoding='utf-8') as yaml_file:
    yaml.dump(data, yaml_file)
1
2
3
4
5
6
7
8
9
10
11
12
13

在这个示例中,我们创建了一个字典data,其中包含一个键为agent_config的数组,数组的元素是包含idnameage字段的字典对象。然后,我们将data字典写入到名为config.yaml的 YAML 文件中。

# 3.json 数据写入 json 文件

import json

# 生成要写入的JSON数据
data = {
    "name": "John",
    "age": 30,
    "city": "New York"
}
1
2
3
4
5
6
7
8
# 指定要写入的JSON文件路径
file_path = "path/to/your/file.json"

# 将JSON数据写入文件
with open(file_path, "w") as json_file:
    json.dump(data, json_file)
1
2
3
4
5
6

# 4.读取目录下的文件

import os

def read_parquet_file(directory):
    """
    读取指定目录下的所有.parquet文件
    :param directory: 要搜索的目录路径
    :return: 包含所有Parquet文件名的列表
    """
    data = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            # 检查文件扩展名是否为.parquet
            if file.endswith('.parquet'):
                # 构建文件的完整路径
                full_path = os.path.join(root, file)
                # 读取Parquet文件,这里假设使用pandas
                df = pd.read_parquet(full_path)
                # 将DataFrame添加到数据列表中
                data.append(df)
    return data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 5.Parquet 转 Excel

import pyarrow.parquet as pq
import pandas as pd
import os

def parquet2excel(file_name):
    """
    将.parquet文件转换为Excel文件
    :param file_name: Parquet文件的路径
    :return: 无返回值,但会打印保存路径
    """
    # 读取Parquet文件
    parquet_file = pq.ParquetFile(file_name)
    data = parquet_file.read().to_pandas()

    # 将读取的数据转换为pandas DataFrame
    df = pd.DataFrame(data)

    # 构建Excel文件的保存路径
    excel_path = file_name.replace('.parquet', '.xlsx')

    # 确保保存路径的目录存在
    os.makedirs(os.path.dirname(f'excel/{excel_path}'), exist_ok=True)

    # 将DataFrame写入Excel文件
    df.to_excel(f'excel/{excel_path}', index=False)

    # 打印保存路径
    print(f'数据已保存到 {excel_path}')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 6.excel 转 text 文件

import pandas as pd

def excel_insert_sql(file_path: str):
    """
    根据excel内容生成insert语句
    :param file_path: Excel文件的路径
    :return: SQL插入语句字符串
    """
    res = ''
    # 如果Excel文件有多个sheet,可以通过sheet_name参数指定要读取的sheet
    data = pd.read_excel(file_path, sheet_name='Sheet1')
    for index, row in data.iterrows():
        name = row['name']
        name = name.replace('"', '')
        description = row['description']
        description = str(description).replace('"', '')
        print(f'insert vertex entity values "{name}":("{description}");\n')
        res += f'insert vertex entity values "{name}":("{description}");\n'

        # 根据需要进行数据处理
    # 例如,获取某一列的数据
    # column_data = data['column_name']
    return res

def save_to_text(insert_sql: str, file_path: str):
    """
    保存SQL插入语句到文本文件中。
    :param insert_sql: SQL插入语句字符串。
    :param file_path: 要保存的文件路径,默认为当前目录下的'output.txt':return: None
    """
    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(insert_sql)

if __name__ == '__main__':
    entities = 'create_final_entities'
    file_path = f'excel/{entities}.xlsx'
    res = excel_insert_sql(file_path=file_path)
    save_to_text(insert_sql=res, file_path=f'txt/{entities}.txt')
    print(f'数据处理完成')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 四.开发常用

# 1.全局参数

import json

# 读取配置文件
with open('config.json') as f:
    config = json.load(f)

# 在全局字典中存储配置参数
global_params = config

def my_function():
    # 在函数中使用全局配置参数
    print(global_params['param1'])

my_function()  # 输出配置文件中的参数值
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 2.变量+占位

def template_history(query: str):
    content = ("历史消息:{history}\n"
               f"{query}")
    return content
1
2
3
4

# 3.受保护属性

在 Python 中,有两种约定用于指示某个属性或方法是受保护的,即不应该被外部直接访问或使用。这些约定是:

  1. 单下划线前缀(_): 在属性或方法的名称前加上单个下划线,例如 _protected_variable_protected_method()。这个约定表示该属性或方法是受保护的,建议外部代码不要直接访问或使用它。虽然这只是一个约定,并没有真正限制外部代码的访问,但它向其他开发人员传达了一个警示,表明这是一个内部实现细节,可能会有变化。

  2. 双下划线前缀(__): 在属性或方法的名称前加上双下划线,例如 __private_variable__private_method()。这个约定表示该属性或方法是私有的,应该在类的内部使用,不应该被子类或外部代码直接访问。Python 使用名称修饰(name mangling)来对双下划线前缀进行变换,以避免与子类中相同名称的属性或方法冲突。例如,__private_variable 在类内部被转换为 _ClassName__private_variable

在 Python 中,__dict__ 是一个特殊属性,它是一个字典(dictionary),用于存储对象的属性和方法。该字典将对象的属性名作为键,对应的属性值作为值。

通过访问 __dict__ 属性,您可以获取对象的属性和方法字典,从而查看、修改或删除对象的属性和方法。这使得您可以动态地操作对象的属性,而不需要事先知道属性的名称。

# 4.联合类型

使用|进行类型提示(Python 3.10+)

def process_value(value: int | str) -> None:
    if isinstance(value, int):
        print(f"Integer: {value}")
    elif isinstance(value, str):
        print(f"String: {value}")

# 使用
process_value(10)      # 输出: Integer: 10
process_value("Hello")  # 输出: String: Hello
1
2
3
4
5
6
7
8
9

使用typing.Union进行类型提示(Python 3.5 - 3.9)

from typing import Union

def process_value(value: Union[int, str]) -> None:
    if isinstance(value, int):
        print(f"Integer: {value}")
    elif isinstance(value, str):
        print(f"String: {value}")
1
2
3
4
5
6
7

# 5.多返回值

import numpy as np

def process_array(arr):
    min_val = np.min(arr)
    max_val = np.max(arr)
    return min_val, max_val

# 创建一个二维数组
array = np.array([[1, 2, 3], [4, 5, 6]])

# 使用函数
min_val, max_val = process_array(array)
print(f"Min: {min_val}, Max: {max_val}")
1
2
3
4
5
6
7
8
9
10
11
12
13

# 6.Optional 使用

在 Python 中,Optional[str] 是一种类型提示,用于表示一个变量可以是 str 类型或 None 类型。这种类型提示通常用于函数参数或返回值,表示该参数或返回值可以为空。

下面是一个使用 Optional[str] 的示例:

def greet(name: Optional[str] = None) -> str:
    if name:
        return f"Hello, {name}!"
    else:
        return "Hello, stranger!"

# 调用函数
print(greet("Alice"))  # 输出: "Hello, Alice!"
print(greet())  # 输出: "Hello, stranger!"
1
2
3
4
5
6
7
8
9

在这个例子中:

  1. 函数 greet 的参数 name 被声明为 Optional[str] 类型。这意味着它可以接受一个字符串类型的值,也可以接受 None 值。
  2. 如果 name 参数被传入,函数将返回 "Hello, {name}!" 字符串。
  3. 如果 name 参数为 None(即未传入值),函数将返回 "Hello, stranger!" 字符串。

使用 Optional[str] 类型提示可以更好地理解代码的预期行为,并在编译时捕获一些潜在的错误。

# 7.提取 json 字符串

为了匹配第一个 { 和最后一个 } 之间的内容,包括 {} 本身,我们可以使用以下正则表达式:

import re

answer_ = """-----key:answer, value:```json
{
  "question1": "鲁迅是什么朝代的?",
  "question2": "鲁迅是谁?",
  "question3": "鲁迅原名是?"
}
```"""

# 正则表达式匹配第一个{和最后一个}之间的内容
pattern = r'\{.*?\}'

match = re.search(pattern, answer_, re.DOTALL)
if match:
    result = match.group(0).strip()  # 使用strip()去除可能的前后空白字符
else:
    result = "没有找到匹配的内容"

print(result)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

这个正则表达式 \{.*?\} 会匹配第一个 { 到最后一个 } 之间的内容。re.DOTALL 标志允许 . 匹配包括换行符在内的任何字符。

# 8.装饰器

下面是一个简单的日志记录装饰器示例:

import functools
import time

def log_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print(f"Calling {func.__name__}")
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} executed in {end_time - start_time:.2f} seconds")
        return result
    return wrapper

@log_decorator
def add(a, b):
    time.sleep(1)  # 模拟耗时操作
    return a + b

result = add(5, 3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 9.关键字

1. Python 关键字简介

Python 关键字是一些保留的词汇,它们在 Python 中有特殊的含义,用于定义程序的结构和流程。根据 Python 官方文档,截至 Python 3.10,共有 35 个关键字:

  • False, None, True, and, as, assert, async, await, break, class, continue, def, del, elif, else, except, finally, for, from, global, if, import, in, is, lambda, nonlocal, not, or, pass, raise, return, try, while, with, yield

这些关键字覆盖了从条件判断到循环控制,从异常处理到类和函数定义等多个方面。

2. 关键字的分类

Python 关键字可以根据它们的用途进行分类:

  • 逻辑控制关键字and, not, or, if, elif, else, for, while
  • 定义和声明关键字class, def, global, nonlocal, lambda
  • 异常处理关键字try, except, finally, raise
  • 模块和包处理关键字import, from, as
  • 赋值和迭代关键字in, is, return, yield
  • 其他关键字False, None, True, assert, async, await, break, continue, del, pass, with

3. 关键字的用法

每个关键字都有其特定的用法和含义:

  • if, elif, else:用于条件判断。

    if x > 0:
        print("Positive")
    elif x == 0:
        print("Zero")
    else:
        print("Negative")
    
    1
    2
    3
    4
    5
    6
  • for, while:用于循环控制。

    for i in range(5):
        print(i)
    
    while x < 10:
        x += 1
    
    1
    2
    3
    4
    5
  • class, def:用于定义类和函数。

    class MyClass:
        pass
    
    def my_function():
        pass
    
    1
    2
    3
    4
    5
  • try, except, finally:用于异常处理。

    try:
        # 尝试执行的代码
    except Exception as e:
        # 处理异常
    finally:
        # 无论是否发生异常都会执行的代码
    
    1
    2
    3
    4
    5
    6
  • import, from, as:用于导入模块和包。

    import os
    from sys import argv
    import numpy as np
    
    1
    2
    3
  • return, yield:用于从函数返回值或生成器产生值。

    def my_function():
        return "Hello, World!"
    
    def my_generator():
        yield 1
        yield 2
    
    1
    2
    3
    4
    5
    6

# 10.时间戳比较

在 Python 中,可以使用datetime.fromtimestamp()方法将时间戳转换为datetime对象。如果时间戳是毫秒级,需要将其除以 1000 转换为秒。以下是转换的示例代码:

from datetime import datetime

# 假设的时间戳(毫秒级)
create_time_timestamp = 1716548680000

# 将毫秒级时间戳转换为秒
create_time_seconds = create_time_timestamp / 1000

# 转换为datetime对象
create_time = datetime.fromtimestamp(create_time_seconds)

# 指定的日期
compare_date = datetime(2024, 1, 1)

# 比较时间
if create_time > compare_date:
    print("createTime is greater than 2024-01-01")
else:
    print("createTime is not greater than 2024-01-01")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 11.获取周一

要编写一个 Python 程序,生成从 2024 年 1 月 1 日到今天(2024 年 6 月 1 日)的所有周一的日期,并打开每个日期对应的网页链接,你可以使用以下代码:

import webbrowser
from datetime import datetime, timedelta

def generate_mondays(start_date, end_date):
    # 初始化一个空列表来存储周一的日期
    mondays = []

    # 从开始日期开始循环,直到结束日期
    current_date = start_date
    while current_date <= end_date:
        if current_date.weekday() == 0:  # weekday() 返回0代表周一
            mondays.append(current_date)
        current_date += timedelta(days=1)

    return mondays

def open_webpages(mondays):
    # 为每个周一生成URL并打开网页
    base_url = "https://mp.csdn.net/poster/"
    for monday in mondays:
        url = base_url + monday.strftime("%Y%m%d")
        webbrowser.open_new(url)  # 使用 open_new 来在新标签页中打开网页

# 设置起始日期为2024年1月1日
start_date = datetime(2024, 1, 1)
# 设置结束日期为今天
end_date = datetime(2024, 6, 1)

# 生成所有周一的日期列表
mondays = generate_mondays(start_date, end_date)

# 打开每个周一的网页
open_webpages(mondays)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

这段代码首先定义了一个函数generate_mondays来计算指定范围内的所有周一的日期,然后定义了open_webpages函数来为每个周一生成 URL 并打开网页。最后,它设置了起始日期和结束日期,并调用这些函数。

请注意,这段代码会依次打开从 2024 年 1 月 1 日到 2024 年 6 月 1 日之间所有周一的网页。如果你希望同时打开所有网页,可以修改webbrowser.open_new(url)webbrowser.open(url),但这样可能会导致浏览器标签页过多,使用户难以管理。使用open_new可以让用户在浏览器中逐个查看每个标签页。

# 12.列表推导式

基本语法:
1

列表推导式的基本语法结构如下:

[expression for item in iterable if condition]
1
  • expression:新列表中的元素表达式。
  • item:从可迭代对象iterable中迭代得到的元素。
  • condition:一个可选的条件表达式,用于过滤满足条件的元素。
简单示例:
1

让我们从一个简单的例子开始,假设我们有一个数字列表,我们想要创建一个新的列表,其中只包含原列表中的偶数:

original_list = [1, 2, 3, 4, 5, 6]
even_numbers = [num for num in original_list if num % 2 == 0]
print(even_numbers)  # 输出: [2, 4, 6]
循环比较:
1
2
3
4

列表推导式与循环的比较

使用列表推导式与使用传统的 for 循环相比,代码更加简洁,可读性更强。以下是使用 for 循环实现相同功能的代码:

original_list = [1, 2, 3, 4, 5, 6]
even_numbers = []
for num in original_list:
    if num % 2 == 0:
        even_numbers.append(num)
print(even_numbers)  # 输出: [2, 4, 6]
嵌套列表推导式:
1
2
3
4
5
6
7

列表推导式可以嵌套使用,用于处理更复杂的数据结构,如二维数组的转置:

matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
transposed = [list(row) for row in zip(*matrix)]
print(transposed)  # 输出: [[1, 4, 7], [2, 5, 8], [3, 6, 9]]
包含多个 for 子句的列表推导式:
1
2
3
4

列表推导式可以包含多个 for 子句,用于笛卡尔积等操作:

from itertools import product
x = [1, 2, 3]
y = ['a', 'b']
pairs = [(x, y) for x in x for y in y]
print(pairs)  # 输出: [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b'), (3, 'a'), (3, 'b')]
使用字典推导式:
1
2
3
4
5
6

列表推导式不仅可以用于列表,还可以用于创建字典:

keys = ['a', 'b', 'c']
values = [1, 2, 3]
my_dict = {k: v for k, v in zip(keys, values)}
print(my_dict)  # 输出: {'a': 1, 'b': 2, 'c': 3}
1
2
3
4

# 13.字典遍历

在 Python 中,遍历字典({})有几种不同的方法,主要取决于你想要遍历字典的键(keys)、值(values)还是键值对(items)。

# 1.遍历所有键

使用.keys()方法可以获取字典中所有的键,然后通过循环遍历这些键。

my_dict = {'a': 1, 'b': 2, 'c': 3}
for key in my_dict.keys():
    print(key)
1
2
3

或者更简洁的方式是直接在字典对象上使用for循环:

for key in my_dict:
    print(key)
1
2

# 2.遍历所有值

使用.values()方法可以获取字典中所有的值,然后通过循环遍历这些值。

for value in my_dict.values():
    print(value)
1
2

# 3.遍历键值对

使用.items()方法可以获取字典中的所有键值对,返回一个包含(key, value)元组的视图对象。

for key, value in my_dict.items():
    print(key, value)
1
2

# 4.使用列表推导式

如果你需要从字典中提取信息并创建列表,可以使用列表推导式。

  • 遍历键:

    keys_list = [key for key in my_dict]
    
    1
  • 遍历值:

    values_list = [value for value in my_dict.values()]
    
    1
  • 遍历键值对:

    items_list = [(key, value) for key, value in my_dict.items()]
    
    1

# 5.使用字典推导式

如果你需要基于现有字典创建一个新的字典,可以使用字典推导式。

new_dict = {key: value * 2 for key, value in my_dict.items()}
1

# 6.使用 next()函数

如果你需要访问字典的第一个键值对,可以使用next()函数。

first_item = next(iter(my_dict.items()), None)
1

# 7.使用 get()方法

如果你需要安全地访问字典中的值,可以使用get()方法,它允许你指定默认值。

value = my_dict.get('key', 'default_value')
1

# 14.数字的布尔值

0 的 bool 值:

if 0:
    print(True)
else:
    print(False)

1
2
3
4
5

返回 False

-1 的 bool 值:

if -1:
    print(True)
else:
    print(False)

1
2
3
4
5

返回 True

# 15. 第三方库 simplejson

simplejson是一个第三方库,它是 Python 内置json模块的一个分支,提供了一些额外的功能和性能优化。以下是simplejson的一些特点:

  • 性能:在某些场景下,simplejson可能比内置的json模块有更好的性能表现,尤其是在处理大量数据时。
  • 额外的序列化选项simplejson提供了一些额外的序列化选项,如use_decimal,允许使用Decimal类型而不是默认的float
  • 更好的错误信息:在发生错误时,simplejson可能会提供更详细的错误信息,有助于调试。
  • 兼容性simplejson在某些情况下提供了更好的兼容性,尤其是在处理一些边缘 JSON 格式时。

# 五.缓存设计

# 1. 环境准备

在 Python 中实现缓存,我们可能会用到标准库中的functools.lru_cache装饰器,或者使用第三方库如cachetools。以下是安装cachetools的方法:

pip install cachetools
1

# 2. 使用functools.lru_cache

Python 标准库中的functools模块提供了一个非常有用的装饰器lru_cache,它可以实现最近最少使用(Least Recently Used,LRU)缓存。这意味着它会缓存最近调用的函数的结果,当缓存满了之后,会淘汰掉最久未被使用的缓存项。

以下是一个使用lru_cache的示例:

import functools

@functools.lru_cache(maxsize=128)
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

# 计算斐波那契数列的第10项
print(fibonacci(10))
1
2
3
4
5
6
7
8
9
10

在这个示例中,fibonacci函数的结果会被缓存,当再次请求相同的参数时,会直接从缓存中获取结果,而不是重新计算。

# 3. 使用cachetools

cachetools是一个提供多种缓存策略的第三方库。它支持 LRU、LFU(Least Frequently Used,最少使用频率)和 RR(Random Replacement,随机替换)等策略。

以下是一个使用cachetools实现 LRU 缓存的示例:

from cachetools import LRUCache

def expensive_function(x):
    # 模拟一个耗时的操作
    return x * x

cache = LRUCache(maxsize=100)

def cached_expensive_function(x):
    if x not in cache:
        cache[x] = expensive_function(x)
    return cache[x]

# 使用缓存的函数
print(cached_expensive_function(4))
print(cached_expensive_function(4))  # 第二次调用将使用缓存的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

在这个示例中,我们创建了一个LRUCache对象,并使用它来缓存expensive_function函数的结果。

# 4. 文件系统缓存

在某些情况下,我们可能需要将缓存数据持久化到文件系统中。这可以通过将数据序列化并存储到文件中来实现。以下是一个简单的文件系统缓存示例:

import json
import os

CACHE_FILE = 'cache.json'

def load_cache():
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r') as f:
            return json.load(f)
    return {}

def save_cache(cache):
    with open(CACHE_FILE, 'w') as f:
        json.dump(cache, f)

cache = load_cache()

def get_data(key):
    if key in cache:
        return cache[key]
    else:
        # 模拟获取数据的过程
        data = f"data for {key}"
        cache[key] = data
        save_cache(cache)
        return data

# 使用文件系统缓存
print(get_data("key1"))
print(get_data("key1"))  # 第二次调用将使用缓存的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

在这个示例中,我们使用 JSON 文件作为缓存存储,并在需要时加载和保存缓存数据。

# 5.excel 行数

from openpyxl import load_workbook

# 指定Excel文件路径
excel_path = 'test_01.xlsx'

# 加载工作簿
workbook = load_workbook(filename=excel_path)

# 选择活动工作表,默认是第一个工作表
sheet = workbook.active

# 获取工作表的行数
row_count = sheet.max_row

print(f'Excel file has {row_count} rows.')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 6.判断类型

if vocab_table_ids is not None and isinstance(vocab_table_ids, str):
    # 这里是当 vocab_table_ids 是字符串时执行的代码
1
2

# 7.pathlib 使用

现在,让我们回到最初的问题,深入分析以下代码片段:

search_path = Path(self._root_dir) / (base_dir or "")
1

这行代码是pathlib模块的一个典型用法,它展示了如何使用Path对象和条件表达式来构建一个搜索路径。下面是对这行代码的逐部分解释:

  1. Path(self._root_dir):这里创建了一个Path对象,其路径由实例变量self._root_dir指定。这个变量代表了类的实例所关联的根目录路径。

  2. /:这是Path对象的路径连接运算符,用于将self._root_dirbase_dir连接起来。

  3. (base_dir or ""):这是一个 Python 的条件表达式,用于确定要连接的第二个路径部分。如果base_dirNone或者任何被认为是False的值(如空字符串、0 等),则表达式的结果将是一个空字符串。这样,即使base_dir未定义或为None,代码也不会抛出错误。

通过这种方式,search_path变量将存储一个由self._root_dirbase_dir(如果提供了的话)组合而成的新路径。如果base_dirNone或未提供,search_path将只包含self._root_dir的值。

# 8.异步请求

在 Python 里如果用异步函数,就里里外外都需要使用异步函数。否则在异步函数里面使用 time.sleep 这种会阻塞整个事件循环。

模拟网络请求的关系:

  • time.sleep <===> requests

  • async.sleep <===> aiohttp

我新增的几个投放、更新卡片的接口是用 aiohttp 支持的

# 异步sleep
await asyncio.sleep(random())

# 同步sleep
time.sleep(random())
1
2
3
4
5

下面是一个使用aiohttp进行异步 HTTP 请求的例子:

import aiohttp
import asyncio

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.text()

async def main():
    url = 'http://example.com'
    async with aiohttp.ClientSession() as session:
        html = await fetch(url, session)
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 9.查看包的大小

du -mh  /Users/qinyingjie/miniconda3/envs/llm-graph-builder/lib/python3.11/site-packages/transformers
1

# 10.并发请求

# 并发请求
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
futures = []
with ThreadPoolExecutor(max_workers=10) as executor:
    for chunk in combined_chunk_document_list:
        chunk_doc = Document(
            page_content=chunk.page_content.encode("utf-8"), metadata=chunk.metadata
        )
        futures.append(
            executor.submit(llm_transformer.convert_to_graph_documents, [chunk_doc])
        )

    for i, future in enumerate(concurrent.futures.as_completed(futures)):
        graph_document = future.result()
        graph_document_list.append(graph_document[0])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 11.判断代码

enable_user_agent = os.environ.get("ENABLE_USER_AGENT", "False").lower() in ("true", "1", "yes")
1

# 12.执行 shell 命令

在 Python 中,你可以使用几种方法来执行 shell 命令。以下是一些常用的方法:

  1. os.system(): 这是最简单的方法,它执行一个指定的命令字符串。返回值是命令的退出状态码。

    import os
    result = os.system('ls -l')
    print("Exit status:", result)
    
    1
    2
    3
  2. subprocess.run() (推荐方式): subprocess模块提供了更多的灵活性和强大的功能来执行和管理子进程。

    import subprocess
    result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
    print("Output:", result.stdout)
    
    1
    2
    3
  3. subprocess.Popen(): 如果你需要更细粒度的控制,可以使用Popen。这个方法允许你执行一个命令并与之交互。

    import subprocess
    process = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    stdout, stderr = process.communicate()
    print("Output:", stdout)
    if process.returncode != 0:
        print("Error:", stderr)
    
    1
    2
    3
    4
    5
    6
  4. os.popen(): 这个方法可以用来执行命令并读取输出。

    with os.popen('ls -l') as stream:
        print(stream.read())
    
    1
    2
  5. subprocess.check_output(): 这个方法用于执行命令并获取输出,如果命令返回非零退出状态码,会抛出异常。

    from subprocess import check_output
    output = check_output(['ls', '-l'], text=True)
    print("Output:", output)
    
    1
    2
    3
  6. subprocess.call(): 这个方法执行一个命令并等待其完成,返回命令的退出状态码。

    import subprocess
    status = subprocess.call(['ls', '-l'])
    print("Exit status:", status)
    
    1
    2
    3

使用这些方法时,记得处理异常和错误,特别是当命令失败时。subprocess模块是执行 shell 命令的首选方式,因为它提供了更多的控制和灵活性。

# 13.中位数的计算方法

import numpy as np

# 假设我们有一组数据
data = np.array([1, 3, 5, 7, 9])

# 使用 np.median 计算中位数
median_value = np.median(data)

print("中位数是:", median_value)
1
2
3
4
5
6
7
8
9

# 14.返回 2 个值

class MyClass:
  def my_method(self, x: int, y: int) -> tuple[int, int]:
      # 这里进行一些操作
      result1 = x + y
      result2 = x * y
      # 返回一个整数元组
      return result1, result2

# 创建 MyClass 的实例
my_instance = MyClass()

# 使用实例调用 my_method 方法
result1, result2 = my_instance.my_method(3, 4)

# 打印结果
print(result1)  # 输出: 7
print(result2)  # 输出: 12
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 15.获取耗时

from datetime import datetime
import time

# 获取当前时间戳并转换为datetime对象
start_time = datetime.fromtimestamp(time.time())

# 程序暂停5秒
time.sleep(5)

# 再次获取当前时间戳并转换为datetime对象
end_time = datetime.fromtimestamp(time.time())

# 计算开始和结束时间的时间差
cost_time = (end_time - start_time)

# 将时间差转换为时分秒格式
hours, remainder = divmod(cost_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

# 格式化输出时分秒
formatted_time = f"{hours:02}:{minutes:02}:{seconds:02}"
print(formatted_time)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

00:00:05

# 16.方法调用

在 Python 中,方法通常是用来对对象进行操作的函数。方法可以带有参数,也可以不带参数。当你调用一个方法时,是否使用括号取决于你想要执行的操作。

  1. 不带括号:当你引用一个方法而不使用括号时,你实际上是在引用这个方法本身,而不是调用它。这通常用于获取方法的引用,或者在某些情况下,你想要将方法作为参数传递给其他函数。

  2. 带括号:当你调用一个方法时,你应该使用括号,即使方法不接受任何参数。括号表示你想要执行这个方法。如果方法需要参数,你还需要在括号内提供这些参数。

例如,假设有一个类UserManager,它有一个方法get_current_user

class UserManager:
    def get_current_user(self):
        # 这里是获取当前用户的逻辑
        return "Current User"
1
2
3
4
  • 不带括号user_manager.get_current_user 这将返回方法对象本身,而不是方法的返回值。

  • 带括号user_manager.get_current_user() 这将调用get_current_user方法,并返回它返回的值,例如 "Current User"。

如果你尝试调用一个不接受参数的方法而不使用括号,Python 解释器会抛出一个TypeError,因为它期望你在调用方法时使用括号。例如:

user_manager = UserManager()
current_user = user_manager.get_current_user()  # 正确
# current_user = user_manager.get_current_user   # 错误,会引发TypeError
1
2
3

总结来说,带括号是调用方法的标准方式,而不带括号通常用于引用方法本身。在 Python 中,即使方法不接受任何参数,调用时也应该使用括号。

# 17.处理parqut文件

import pandas as pd

# path = '/Users/qinyingjie/Documents/python-workspace/graphrag-server/ragtest/output/20240923-100846/artifacts/create_final_entities.parquet'
path = '/Users/qinyingjie/Documents/python-workspace/graphrag-server/ragtest/output/20240923-100846/artifacts/create_final_entities.parquet'

df = pd.read_parquet(path)
# 统计行数
num_rows = len(df)
print(f"文件 {path} 包含 {num_rows} 行。")

# 检查是否存在 'weight' 字段
if 'description_embedding' in df.columns:
    description_embedding = df['description_embedding'][0]
    embedding_dim = len(description_embedding)
    print(embedding_dim)
    print(f"The embedding dimension is: {embedding_dim}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 18.获取异常信息

try:
    return SearchResult(
        response=response,
        context_data=context_records,
        context_text=context_text,
        completion_time=time.time() - start_time,
        llm_calls=1,
        prompt_tokens=num_tokens(search_prompt, self.token_encoder),
    )
except Exception as e:
    log.exception("Exception in _map_response_single_batch")
    error_message = str(e)  # Capture the exception message
    return SearchResult(
        response=f"Error: {error_message}",  # Store the error message in the response
        context_data=context_records,
        context_text=context_text,
        completion_time=time.time() - start_time,
        llm_calls=1,
        prompt_tokens=num_tokens(search_prompt, self.token_encoder),
    )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 19.接口下载json文件

 data_ = result['data']
 orig_name = data_['orig_name']
 json_data = json.dumps(data_)
 safe_filename = urllib.parse.quote(orig_name + ".json")
 headers = {
     "Content-Disposition": f'attachment; filename="{safe_filename}"; filename*=UTF-8\'{safe_filename}'
 }
 return http_response(
     content=json_data,
     media_type="application/json",
     headers=headers
 )
1
2
3
4
5
6
7
8
9
10
11
12

# 一.Python工具包

# 1.打包

sh build_push.sh  0.0.1.202404131548
1

# 2.预约明天

pip install  /Users/qinyingjie/Documents/python-workspace/kwan_tools/dist/kwan_tools-0.0.1.202404131548-py3-none-any.whl
1

# 3.场地预约

order-start config.yaml
1

# 1.枚举的使用

from enum import Enum

class TaskStatus(Enum):
    # 枚举成员名称通常大写
    RUNNING = "运行中"
    TERMINATED = "已终止"
    SUCCESS = "成功"
    FAILURE = "失败"

# 使用枚举
task = TaskStatus.RUNNING
print(task)  # 输出: TaskStatus.RUNNING
print(task.name)  # 输出: RUNNING
print(task.value)  # 输出: 运行中
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 2. 异步任务终止

from typing import Any

from manager.graph.manager import GraphManager
from manager.schemas.manager import SchemasManager
from utils import logger
from fastapi import APIRouter
from manager.graph.query_graph import LocalQueryGraph
import time
import asyncio

GraphRouter = APIRouter(prefix="/test", tags=["测试管理"])
global task_map
task_map: dict[Any, Any] = {}

class GraphRouterMap:

    # 长时间运行的异步任务
    @staticmethod
    async def long_running_task():
        try:
            while True:
                # 模拟任务运行
                print("长时间运行的任务正在运行.....")
                await asyncio.sleep(2)
        except asyncio.CancelledError:
            print("任务被取消")

    @staticmethod
    @GraphRouter.get("/start", summary="启动任务", description="启动任务")
    async def start_task():
        # 启动长时间任务,并返回任务对象
        run_id = time.strftime("%Y%m%d-%H%M%S")
        task = asyncio.create_task(GraphRouterMap.long_running_task())
        task_map[run_id] = task
        return run_id

    @staticmethod
    @GraphRouter.get("/stop", summary="中止任务", description="中止任务")
    async def stop_task(run_id: str):
        task = task_map.get(run_id)
        task.cancel()
        return {"message": "任务取消请求已发送"}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 3.详细日志

在 Python 中打印详细的堆栈信息,通常可以通过以下方法实现:

使用traceback模块:这是 Python 标准库中的一个模块,专门用于打印堆栈信息。当你遇到异常时,可以使用traceback.print_exc()来打印异常信息和堆栈跟踪。例如:

import traceback

try:
    # 你的代码逻辑
    # 可能会引发异常的代码
except Exception as e:
    traceback.print_exc()
1
2
3
4
5
6
7
上次更新: 11/17/2024, 2:27:28 PM