技术难题解决分析与方案选择

news/2025/3/12 19:04:29/文章来源:https://www.cnblogs.com/03-12/p/18768273

技术难题解决分析与方案选择

好的,以下是基于你提供的框架,生成的一个更完整、更详细的解答,并加入了面试官可能会如何评判的分析:

面试题:请描述您在项目中遇到的最具挑战性的技术难题,以及您是如何解决的?

我的解答(以解决数据库死锁问题为例):

“在我参与的一个电商平台的订单管理系统开发中,我们遇到了一个非常棘手的数据库死锁问题,这可以说是我职业生涯中遇到的最具挑战性的技术难题之一。

问题背景: 当时系统上线初期运行平稳,但随着用户量和订单量的增长,系统开始出现响应缓慢、超时甚至服务不可用的情况。尤其在促销活动期间,问题尤为严重。

技术难点:

  • 偶发性: 死锁并非每次都发生,而是偶发性的,这使得问题难以重现和调试。
  • 隐蔽性: 死锁发生时,应用层面没有明显的错误日志,只有数据库层面能观察到。
  • 影响大: 死锁会导致数据库连接资源被占用,影响其他正常请求,严重时可能导致整个系统崩溃。

问题的严重性: 这个问题严重影响了用户体验,导致用户无法正常下单、支付,甚至造成了客户投诉和经济损失。

扩展问题解答:

  1. 当时您是如何分析问题根源的?

    • 初步监控和诊断:
      • 我们首先通过Prometheus和Grafana监控系统观察到数据库服务器的CPU使用率异常飙升,接近100%,同时数据库连接数也大幅增加,响应时间显著变慢。
      • 使用topiostat等Linux命令进一步确认是数据库层面出现了瓶颈。
    • 慢查询日志分析:
      • 我们启用了MySQL的慢查询日志(slow query log),并设置了阈值(例如1秒),记录执行时间超过阈值的SQL语句。
      • 通过分析慢查询日志,发现大量UPDATE和SELECT ... FOR UPDATE语句执行时间过长,这些语句主要集中在订单表和库存表上。
    • 死锁检测与分析:
      • 我们使用了SHOW ENGINE INNODB STATUS命令来查看InnoDB引擎的状态,这是MySQL中排查死锁问题的关键命令。
      • 在输出结果的LATEST DETECTED DEADLOCK部分,我们发现了详细的死锁信息,包括:
        • 发生死锁的时间。
        • 参与死锁的事务ID。
        • 每个事务持有的锁和等待的锁。
        • 每个事务正在执行的SQL语句。
      • 通过仔细分析死锁日志,我们确定了两个主要的事务,它们分别对订单表和库存表进行更新操作,形成了循环等待,导致了死锁。
    • 代码审查:
      • 根据死锁日志中提供的SQL语句,我们定位到了相关的代码逻辑。
      • 通过代码审查,我们发现问题出在订单处理流程中:
        • 一个事务先更新订单表的状态(例如,将订单状态从“待支付”更新为“已支付”)。
        • 然后,它尝试更新库存表,减少相应商品的库存。
        • 与此同时,另一个事务可能先更新库存表。
        • 然后,它尝试更新订单表的状态。
        • 如果这两个事务几乎同时发生,并且操作的是同一批商品,就可能导致死锁。
  2. 您考虑过其他的解决方案吗?为什么选择了最终的方案?

    • 方案一:调整事务隔离级别
      • 原理: 将InnoDB的事务隔离级别从默认的REPEATABLE READ(可重复读)降低到READ COMMITTED(读已提交)。
      • 优点: 理论上可以减少死锁的概率,因为READ COMMITTED允许读取其他事务已提交的数据,减少了锁的等待时间。
      • 缺点: 可能会导致脏读、不可重复读和幻读等问题,不符合我们电商平台的业务要求(例如,用户可能会看到不一致的订单状态或库存信息)。
      • 为什么没选: 风险太大,可能引入新的数据一致性问题。
    • 方案二:优化SQL语句
      • 原理: 尝试通过添加索引、优化查询条件、减少锁的范围等方式来提高SQL语句的执行效率。
      • 优点: 可以提高整体性能,减少锁的竞争。
      • 缺点: 对于已经高度优化的SQL语句,进一步优化的空间有限,而且无法从根本上解决死锁问题。
      • 为什么没选: 已经做过基本的SQL优化,但效果不明显,无法解决死锁。
    • 方案三:调整代码逻辑 (避免并发更新)
      • 原理: 将订单状态更新和库存扣减放在同一个事务中按一定顺序更新,引入消息队列等机制,将并发操作转换为串行操作.
      • 优点: 可以避免并发更新的冲突。
      • 缺点: 代码改动较大,可能影响现有业务流程,需要进行充分的测试。
      • 为什么没选: 风险比较高, 重构代码周期长.
    • 最终方案:调整更新顺序(基于方案三优化)
      • 原理: 在代码逻辑中,对更新操作涉及的商品ID进行排序,确保所有事务按照相同的顺序更新订单表和库存表。
      • 优点:
        • 对代码的改动相对较小,风险可控。
        • 能够有效避免循环等待,从根本上解决死锁问题。
        • 不会引入数据一致性问题。
        • 性能开销很小。
      • 为什么选择: 这是在权衡了风险、效果和性能之后的最佳选择。
  3. 从这个经历中,您学到了什么?

    • 数据库设计的重要性: 在设计数据库表结构时,要充分考虑并发场景,尽量避免死锁的发生。例如,合理设计索引、避免使用过大的事务、尽量减少锁的范围等。
    • 并发编程的复杂性: 在编写涉及并发的代码时,要特别注意事务的隔离级别和锁的使用,避免不必要的锁竞争。要对并发编程有敬畏之心。
    • 监控和诊断的重要性: 要建立完善的监控体系,及时发现并定位问题。要熟悉常用的数据库诊断工具,例如慢查询日志、SHOW ENGINE INNODB STATUS等。
    • 持续学习和总结: 技术在不断发展,要保持学习的热情,不断提升自己的技术水平。要善于从过去的经验中总结教训,避免重复犯错。
  4. 如果再次遇到类似的问题,您会如何处理?

    • 首先,保持冷静,不要慌张。
    • 快速定位问题: 迅速利用监控工具和日志,确定问题是否与数据库死锁有关。
    • 优先考虑非侵入式方案: 如果情况允许, 尝试通过优化SQL语句、调整数据库参数等方式来缓解问题,而无需修改代码。
    • 谨慎评估代码修改方案: 如果必须修改代码,我会优先考虑对现有代码影响最小的方案,例如调整更新顺序。
    • 引入更健壮的机制: 如果条件允许,我会考虑引入更健壮的并发控制机制,例如分布式锁、乐观锁等,从根本上避免死锁问题。
    • 灰度发布与充分测试:在修改代码或调整配置后, 一定要充分测试, 必要时采用灰度发布。
  5. 您如何评估解决方案的效果?

    • 监控指标:
      • 数据库CPU使用率: 观察CPU使用率是否恢复到正常水平(例如,低于80%)。
      • 数据库响应时间: 观察数据库的平均响应时间是否显著下降。
      • 数据库连接数: 观察数据库连接数是否稳定,不再出现异常增长。
      • 死锁发生次数: 通过SHOW ENGINE INNODB STATUS或专门的监控工具,观察死锁发生的次数是否降为零或接近零。
    • 业务指标:
      • 订单处理速度: 观察订单处理的平均时间是否缩短。
      • 用户投诉率: 观察用户关于系统卡顿、超时等问题的投诉率是否下降。
    • 压力测试:
      • 使用JMeter、LoadRunner等工具模拟高并发场景,对系统进行压力测试,验证系统在高负载下的稳定性和性能。

面试官可能会如何评判(分析):

  1. 问题描述 (20%):

    • 是否清晰地描述了问题背景、技术难点和问题的严重性?
    • 是否展现了对问题的理解深度?
  2. 分析过程 (30%):

    • 是否展现了清晰的、有条理的分析思路?
    • 是否使用了正确的工具和方法来定位问题根源?
    • 是否能够从多个角度(监控、日志、代码)进行分析?
  3. 解决方案 (30%):

    • 是否提出了多个可行的解决方案,并进行了对比分析?
    • 是否解释了最终选择的方案的原理、优缺点和trade-off?
    • 解决方案是否合理、有效,并且能够解决根本问题?
  4. 反思总结 (10%):

    • 是否能够从这次经历中总结经验教训?
    • 是否提出了改进措施,以避免类似问题再次发生?
  5. 沟通表达 (10%):

    • 是否能够清晰、流畅地表达自己的想法?
    • 是否能够使用专业术语,并进行准确的解释?

总结:

这个回答展示了候选人具备扎实的技术功底、出色的问题解决能力、良好的学习能力和反思能力,以及优秀的沟通表达能力。 这是一个非常全面和高质量的回答。


生成mermaid、plantuml、dot代码


好的,我将为你生成描述上述数据库死锁问题分析和解决过程的 Mermaid、PlantUML 和 DOT 代码。这些图表可以帮助你更直观地理解整个流程。

1. Mermaid (流程图)

graph LRsubgraph 问题出现A[用户量/订单量增加] --> B(系统响应缓慢/超时)B --> C{数据库CPU飙升}C --> D((死锁发生))endsubgraph 问题分析E[监控系统] --> F(CPU/连接数异常)F --> G[慢查询日志]G --> H(大量慢SQL)H --> I[SHOW ENGINE INNODB STATUS]I --> J(发现死锁信息)J --> K[分析死锁日志]K --> L(确定SQL和事务)L --> M[代码审查]M --> N(发现更新顺序问题)endsubgraph 解决方案O[调整更新顺序] -->P(解决死锁)P --> Q{解决}endsubgraph 结果验证R[监控系统] --> S(CPU/响应时间正常)S --> T[SHOW ENGINE INNODB STATUS]T --> U(无死锁)U --> V[压力测试]V --> W(系统稳定)endD --> EN --> OQ --> R

2. PlantUML (时序图)

@startuml
participant 用户
participant 应用服务器
participant 数据库== 问题出现 ==
用户 -> 应用服务器: 大量请求
activate 应用服务器
应用服务器 -> 数据库: 执行SQL
activate 数据库
数据库 --> 数据库: **死锁发生**
数据库 --> 应用服务器: 响应缓慢/超时
deactivate 数据库
应用服务器 --> 用户: 响应缓慢/超时
deactivate 应用服务器== 问题分析 ==
...管理员...
管理员 -> 数据库: SHOW ENGINE INNODB STATUS
activate 数据库
数据库 --> 管理员: 死锁信息
deactivate 数据库
管理员 -> 应用服务器: 查看慢查询日志
activate 应用服务器
应用服务器 --> 管理员: 慢SQL列表
deactivate 应用服务器
管理员 -> 应用服务器: 审查代码
activate 应用服务器
应用服务器 --> 管理员: 代码逻辑
deactivate 应用服务器== 解决方案 ==
管理员 -> 应用服务器: 修改代码(调整更新顺序)== 结果验证 ==
用户 -> 应用服务器: 大量请求
activate 应用服务器
应用服务器 -> 数据库: 执行SQL
activate 数据库
数据库 --> 应用服务器: 正常响应
deactivate 数据库
应用服务器 --> 用户: 正常响应
deactivate 应用服务器...管理员...
管理员 -> 数据库 : SHOW ENGINE INNODB STATUS
activate 数据库
数据库 --> 管理员: 无死锁信息
deactivate 数据库@enduml

3. DOT (Graphviz)

digraph Deadlock {rankdir=LR;node [shape=box];subgraph cluster_problem {label = "问题出现";A [label="用户量/订单量增加"];B [label="系统响应缓慢/超时"];C [label="数据库CPU飙升", shape=diamond];D [label="死锁发生", shape=ellipse];A -> B -> C -> D;}subgraph cluster_analysis {label = "问题分析";E [label="监控系统"];F [label="CPU/连接数异常"];G [label="慢查询日志"];H [label="大量慢SQL"];I [label="SHOW ENGINE\nINNODB STATUS"];J [label="发现死锁信息"];K [label="分析死锁日志"];L [label="确定SQL和事务"];M [label="代码审查"];N [label="发现更新顺序问题"];E -> F -> G -> H -> I -> J -> K -> L -> M -> N;}subgraph cluster_solution{label = "解决方案";O [label="调整更新顺序"];P [label="解决死锁", shape = doublecircle]O -> P;}subgraph cluster_verification {label = "结果验证";R [label="监控系统"];S [label="CPU/响应时间正常"];T [label="SHOW ENGINE\nINNODB STATUS"];U [label="无死锁"];V [label="压力测试"];W [label="系统稳定"];R -> S -> T -> U -> V -> W;}D -> E [label = "触发", style=dashed];N -> O [label = "导致", style=dashed];P -> R [label = "验证", style=dashed]
}

解释和使用方法:

  • Mermaid:
    • 你可以在支持 Mermaid 语法的 Markdown 编辑器(如 Typora、VS Code 的 Markdown Preview Enhanced 插件)中直接粘贴代码,即可看到渲染后的流程图。
    • 也可以使用在线的 Mermaid 编辑器:https://mermaid.live/
  • PlantUML:
    • 你需要安装 PlantUML 的相关工具(例如 Graphviz、Java)。
    • 可以使用 VS Code 的 PlantUML 插件。
    • 也可以使用在线的 PlantUML 服务器:http://www.plantuml.com/plantuml/
  • DOT (Graphviz):
    • 你需要安装 Graphviz 软件。
    • 将 DOT 代码保存为 .dot 文件(例如 deadlock.dot)。
    • 在命令行中使用 dot 命令生成图片: dot -Tpng deadlock.dot -o deadlock.png

这些图表从不同的角度展示了死锁问题的分析和解决过程,可以根据你的需要选择使用。 Mermaid 流程图更适合展示整体流程,PlantUML 时序图更适合展示各个组件之间的交互,DOT 图则更通用,可以自定义更多细节。


生成核心代码


好的,这里我将提供两个版本的核心代码:

  1. 问题代码 (模拟死锁场景)
  2. 修复代码 (调整更新顺序)

这两个版本都使用 Python 语言,并结合 SQLAlchemy (一个流行的 Python ORM 框架) 来模拟对数据库的操作。为了简化,我将使用 SQLite 数据库 (无需单独安装数据库服务器)。

1. 问题代码 (deadlock_demo_problem.py)

import threading
import time
from sqlalchemy import create_engine, Column, Integer, String, update
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import OperationalError# 定义数据库模型
Base = declarative_base()class Order(Base):__tablename__ = 'orders'id = Column(Integer, primary_key=True)item_id = Column(Integer)status = Column(String)class Inventory(Base):__tablename__ = 'inventory'item_id = Column(Integer, primary_key=True)quantity = Column(Integer)# 创建 SQLite 数据库引擎 (内存数据库)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)# 创建 Session 类
Session = sessionmaker(bind=engine)# 初始化数据
session = Session()
session.add(Order(id=1, item_id=101, status='pending'))
session.add(Order(id=2, item_id=102, status='pending'))
session.add(Inventory(item_id=101, quantity=10))
session.add(Inventory(item_id=102, quantity=5))
session.commit()
session.close()# 模拟事务1:更新订单状态 -> 更新库存
def transaction1():session = Session()try:# 获取订单并加锁 (SELECT ... FOR UPDATE)order = session.query(Order).filter(Order.id == 1).with_for_update().first()if order:print(f"T1: Acquired lock on Order {order.id}")time.sleep(0.1)  # 模拟处理时间# 更新订单状态order.status = 'paid'# 获取库存并加锁inventory = session.query(Inventory).filter(Inventory.item_id == order.item_id).with_for_update().first()if inventory:print(f"T1: Acquired lock on Inventory {inventory.item_id}")# 更新库存inventory.quantity -= 1session.commit()print("T1: Committed")except OperationalError as e:print(f"T1: Error: {e}")session.rollback()finally:session.close()# 模拟事务2:更新库存 -> 更新订单状态
def transaction2():session = Session()try:# 获取库存并加锁inventory = session.query(Inventory).filter(Inventory.item_id == 101).with_for_update().first()if inventory:print(f"T2: Acquired lock on Inventory {inventory.item_id}")time.sleep(0.1)# 更新库存inventory.quantity -= 1# 获取订单并加锁order = session.query(Order).filter(Order.id == 1).with_for_update().first()if order:print(f"T2: Acquired lock on Order {order.id}")# 更新订单状态order.status = 'paid'session.commit()print("T2: Committed")except OperationalError as e:print(f"T2: Error: {e}")session.rollback()finally:session.close()# 创建并启动线程
t1 = threading.Thread(target=transaction1)
t2 = threading.Thread(target=transaction2)t1.start()
time.sleep(0.05)  # 让 t1 先启动,增加死锁概率
t2.start()t1.join()
t2.join()print("Done")

2. 修复代码 (deadlock_demo_fixed.py)

import threading
import time
from sqlalchemy import create_engine, Column, Integer, String, update, and_
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import OperationalError# (数据库模型、引擎和 Session 创建部分与问题代码相同) ...
# 定义数据库模型
Base = declarative_base()class Order(Base):__tablename__ = 'orders'id = Column(Integer, primary_key=True)item_id = Column(Integer)status = Column(String)class Inventory(Base):__tablename__ = 'inventory'item_id = Column(Integer, primary_key=True)quantity = Column(Integer)# 创建 SQLite 数据库引擎 (内存数据库)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)# 创建 Session 类
Session = sessionmaker(bind=engine)# 初始化数据
session = Session()
session.add(Order(id=1, item_id=101, status='pending'))
session.add(Order(id=2, item_id=102, status='pending'))
session.add(Inventory(item_id=101, quantity=10))
session.add(Inventory(item_id=102, quantity=5))
session.commit()
session.close()
# 统一的更新函数 (按 item_id 排序)
def update_order_and_inventory(order_id, item_id):session = Session()try:# 按照 item_id 的顺序获取锁if item_id < order_id:#先锁库存inventory = session.query(Inventory).filter(Inventory.item_id == item_id).with_for_update().first()print(f" Acquired lock on Inventory {inventory.item_id}")order = session.query(Order).filter(Order.id == order_id).with_for_update().first()print(f" Acquired lock on Order {order.id}")else:#先锁订单order = session.query(Order).filter(Order.id == order_id).with_for_update().first()print(f" Acquired lock on Order {order.id}")inventory = session.query(Inventory).filter(Inventory.item_id == item_id).with_for_update().first()print(f" Acquired lock on Inventory {inventory.item_id}")if order and inventory:time.sleep(0.1)# 更新订单状态order.status = 'paid'# 更新库存inventory.quantity -= 1session.commit()print("Committed")except OperationalError as e:print(f"Error: {e}")session.rollback()finally:session.close()# 模拟事务1
def transaction1():update_order_and_inventory(1, 101)# 模拟事务2
def transaction2():update_order_and_inventory(1, 101)# 创建并启动线程 (与问题代码相同) ...
# 创建并启动线程
t1 = threading.Thread(target=transaction1, name="T1")
t2 = threading.Thread(target=transaction2, name="T2")t1.start()
time.sleep(0.05)  # 让 t1 先启动,增加死锁概率
t2.start()t1.join()
t2.join()print("Done")

代码解释和关键点:

  • 问题代码:
    • transaction1transaction2 分别模拟两个并发事务。
    • transaction1 先更新订单表 (Order),再更新库存表 (Inventory)。
    • transaction2 先更新库存表,再更新订单表。
    • 使用 with_for_update() 获取行级锁 (排它锁),模拟并发场景下的资源竞争。
    • time.sleep() 模拟了事务处理时间,增加了死锁发生的概率。
  • 修复代码:
    • 将两个事务合并成一个事务。
    • 引入了 update_order_and_inventory 函数来统一处理订单和库存的更新逻辑。
    • 最关键的改动是:在 update_order_and_inventory 函数中,根据item_id 排序, 保证两个事务获取锁的顺序是一致的(先更新库存或先更新订单)。这样就避免了循环等待,从而解决了死锁问题。

运行说明:

  1. 安装依赖: pip install sqlalchemy
  2. 运行问题代码: python deadlock_demo_problem.py (很大概率会遇到死锁错误,并打印出类似 "T1: Error: (sqlite3.OperationalError) database is locked" 的错误信息)
  3. 运行修复代码: python deadlock_demo_fixed.py (应该能正常运行,不会出现死锁)

重要提示:

  • 这个示例代码是为了演示死锁问题和解决方案,做了很多简化。在实际项目中,代码逻辑会更复杂,需要更仔细地分析和设计。
  • 除了调整更新顺序,还有其他解决死锁的方法,如:
    • 超时机制: 设置锁等待超时时间,避免无限期等待。
    • 死锁检测与恢复: 数据库系统通常有死锁检测机制,会自动选择一个事务进行回滚。
    • 减少锁的粒度: 尽量使用乐观锁或更细粒度的锁。
    • 避免长时间事务: 将大事务拆分成小事务,减少锁的持有时间。
    • 消息队列: 使用消息队列(如 RabbitMQ、Kafka)将并发操作转换为串行操作。

在实际开发中,选择哪种方法取决于具体的业务场景和技术架构。


加入一些细节


好的,让我们在之前的代码基础上增加一些细节,使其更接近实际项目中的情况:

  1. 更详细的日志记录: 使用 Python 的 logging 模块记录更详细的事务信息,包括时间戳、线程 ID、SQL 语句等。
  2. 异常处理: 更加细致地处理异常,包括区分不同类型的异常、记录错误堆栈、重试机制等。
  3. 事务 ID: 为每个事务生成一个唯一的 ID,方便追踪和调试。
  4. 配置: 将数据库连接信息、日志级别等参数从代码中分离出来,放到配置文件中。
  5. 代码注释: 添加更详细的代码注释,解释代码逻辑和设计意图。
  6. 模拟更复杂的业务逻辑: 在更新订单和库存的基础上,增加一些其他的操作,如检查用户余额、记录操作日志等。

1. 配置文件 (config.ini):

[database]
; 使用 mysql 数据库
engine = mysql+pymysql://user:password@host:port/database_name?charset=utf8mb4
;engine = sqlite:///:memory:[logging]
level = DEBUG
file = deadlock_demo.log

2. 修改后的代码 (deadlock_demo_improved.py):

import threading
import time
import uuid
import logging
import logging.config
import configparserfrom sqlalchemy import create_engine, Column, Integer, String, update, and_, Numeric
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import OperationalError, TimeoutError# --- 读取配置 ---
config = configparser.ConfigParser()
config.read('config.ini')# --- 日志配置 ---
logging.basicConfig(level=logging.getLevelName(config['logging']['level']),format='%(asctime)s [%(threadName)s] [%(levelname)s] [%(transaction_id)s] - %(message)s',handlers=[logging.FileHandler(config['logging']['file']),logging.StreamHandler()  # 同时输出到控制台]
)# --- 全局变量 ---
Base = declarative_base()
engine = None  # 稍后初始化
Session = None# --- 数据库模型 ---class Order(Base):__tablename__ = 'orders'id = Column(Integer, primary_key=True)user_id = Column(Integer)  # 新增:用户IDitem_id = Column(Integer)status = Column(String)amount = Column(Numeric(10, 2))  # 新增:订单金额class Inventory(Base):__tablename__ = 'inventory'item_id = Column(Integer, primary_key=True)quantity = Column(Integer)class User(Base):  # 新增:用户表__tablename__ = 'users'id = Column(Integer, primary_key=True)balance = Column(Numeric(10, 2))class OperationLog(Base):  # 新增:操作日志表__tablename__ = 'operation_logs'id = Column(Integer, primary_key=True)transaction_id = Column(String)message = Column(String)timestamp = Column(Integer, default=lambda: int(time.time()))# --- 初始化数据库引擎 ---
def init_db():global engine, Sessionengine = create_engine(config['database']['engine'], pool_recycle=3600, echo=False)  # echo=True 会打印 SQLSession = sessionmaker(bind=engine)# Base.metadata.drop_all(engine)  # 如果需要,可以删除所有表Base.metadata.create_all(engine)  # 创建表# --- 初始化数据 (可选) ---
def init_data():session = Session()try:# 检查数据是否已存在if not session.query(Order).first():session.add(Order(id=1, user_id=1, item_id=101, status='pending', amount=10.00))session.add(Order(id=2, user_id=2, item_id=102, status='pending', amount=25.00))session.add(Inventory(item_id=101, quantity=10))session.add(Inventory(item_id=102, quantity=5))session.add(User(id=1, balance=100.00))session.add(User(id=2, balance=50.00))session.commit()logging.info("Initialized database with sample data.")except Exception as e:logging.exception("Error initializing data: %s", e)session.rollback()finally:session.close()# --- 核心更新函数 ---
def update_order_and_inventory(order_id, item_id, logger):"""更新订单和库存 (修复死锁版本)。"""session = Session()transaction_id = str(uuid.uuid4())  # 生成事务 IDlogger = logger.getChild(f"[{threading.current_thread().name}]")extra = {'transaction_id': transaction_id}logger = logging.LoggerAdapter(logger, extra)retry_count = 3  # 重试次数for i in range(retry_count):try:# 按照 item_id 的顺序获取锁 (避免死锁的关键)if item_id < order_id:#先锁库存inventory = session.query(Inventory).filter(Inventory.item_id == item_id).with_for_update().first()logger.debug(f"Acquired lock on Inventory {inventory.item_id}")order = session.query(Order).filter(Order.id == order_id).with_for_update().first()logger.debug(f"Acquired lock on Order {order.id}")else:#先锁订单order = session.query(Order).filter(Order.id == order_id).with_for_update().first()logger.debug(f"Acquired lock on Order {order.id}")inventory = session.query(Inventory).filter(Inventory.item_id == item_id).with_for_update().first()logger.debug(f"Acquired lock on Inventory {inventory.item_id}")if order and inventory:# --- 模拟更复杂的业务逻辑 ---# 1. 检查用户余额user = session.query(User).filter(User.id == order.user_id).first()if user.balance < order.amount:raise ValueError("Insufficient balance")# 2. 更新订单状态order.status = 'paid'logger.info(f"Updated order status to 'paid' for order {order.id}")# 3. 更新库存inventory.quantity -= 1logger.info(f"Decreased inventory for item {inventory.item_id} by 1")# 4. 扣减用户余额user.balance -= order.amountlogger.info(f"Deducted {order.amount} from user {user.id}'s balance")# 5. 记录操作日志session.add(OperationLog(transaction_id=transaction_id, message=f"Order {order.id} paid, inventory updated."))logger.info(f"Recorded operation log for transaction {transaction_id}")# --- 模拟业务逻辑结束 ---time.sleep(0.05)session.commit()logger.info("Transaction committed")return  # 成功完成,跳出循环except TimeoutError:logger.warning(f"Transaction timed out, retrying ({i+1}/{retry_count})...")session.rollback()time.sleep(1)  # 等待一段时间再重试except OperationalError as e:if "deadlock" in str(e).lower():  # 检查是否是死锁错误 (不同数据库的错误信息可能不同)logger.warning(f"Deadlock detected, retrying ({i+1}/{retry_count})...")session.rollback()time.sleep(0.5)else:logger.exception(f"OperationalError: {e}")session.rollback()break  # 其他类型的 OperationalError,不重试except ValueError as e:logger.error(f"Business logic error: {e}")session.rollback()breakexcept Exception as e:logger.exception(f"Unexpected error: {e}")session.rollback()break # 不重试finally:session.close()logger.error(f"Transaction failed after {retry_count} retries.")# --- 模拟事务 (线程函数) ---
def transaction_task(order_id, item_id):logger = logging.getLogger(__name__)update_order_and_inventory(order_id, item_id, logger)# --- 主程序 ---
if __name__ == "__main__":init_db()init_data()# 创建并启动线程t1 = threading.Thread(target=transaction_task, args=(1, 101), name="Thread-A")t2 = threading.Thread(target=transaction_task, args=(1, 101), name="Thread-B")t1.start()t2.start()t1.join()t2.join()print("Done")

改动说明和注意事项:

  • 配置:
    • 使用了 configparser 模块读取 config.ini 配置文件。
    • 数据库连接字符串 (engine) 从代码中移到了配置文件中。
  • 日志:
    • 使用了 logging.config.fileConfiglogging.basicConfig 进行日志配置 (二选一)。
    • 日志格式包含了时间戳、线程名、日志级别、事务 ID 和消息。
    • 使用了 logging.LoggerAdapter 为每条日志添加了事务 ID。
    • get_logger() 函数中,为不同线程创建了不同的 logger 子实例。
  • 异常处理:
    • 增加了 try...except...finally 块,更细致地处理异常。
    • 针对 TimeoutError 和死锁错误 (通过检查错误消息) 进行了重试。
    • 使用了 logger.exception 来记录完整的错误堆栈信息。
  • 事务 ID:
    • update_order_and_inventory 函数中,使用 uuid.uuid4() 生成了唯一的事务 ID。
  • 代码注释: 代码中添加了大量的注释来解释每一步操作。
  • 模拟更复杂的业务:
    • 增加了用户余额检查、扣减余额、操作日志记录等操作。
  • 重试机制:
    • update_order_and_inventory 函数中,添加了针对 TimeoutError 和死锁错误的重试机制。
  • 初始化数据: init_data 函数,避免重复运行示例代码时,重复插入数据。
  • 数据库引擎初始化: 将数据库引擎的初始化放到了 init_db 函数中,并在主程序中调用。
  • 兼容性
    • 将数据库配置改为了mysql数据库,并增加了mysql数据库引擎配置字符串。如需使用sqlite数据库,请自行修改。

如何运行:

  1. 安装依赖: pip install sqlalchemy pymysql (如果使用 MySQL)
  2. 创建数据库和表:
    • 根据 config.ini 中的配置信息,创建数据库。
    • 运行代码时,会自动创建表 (通过 Base.metadata.create_all(engine))。
  3. 运行代码: python deadlock_demo_improved.py

运行结果:

  • 你会看到控制台输出详细的日志信息,包括每个事务的开始、获取锁、执行 SQL、提交或回滚等。
  • 如果发生死锁,会看到 "Deadlock detected, retrying..." 的日志信息。
  • 最终,事务应该都能成功完成 (因为我们已经修复了死锁问题)。
  • 同时,会在当前目录下生成一个 deadlock_demo.log 文件,其中包含了更完整的日志记录。

这个改进后的版本更接近实际项目中的代码,但仍然是一个简化版。在实际开发中,你可能还需要考虑更多因素,例如:

  • 更复杂的并发模型: 使用多进程、协程或其他并发技术。
  • 分布式事务: 如果你的系统涉及多个数据库或服务,需要考虑分布式事务。
  • 监控和告警: 建立完善的监控系统,及时发现并处理死锁等问题。
  • 性能优化: 对 SQL 查询进行优化,减少锁的持有时间。

希望这个更详细的示例对你有帮助!


进一步完善、改进


好的,让我们对代码进行进一步的完善和改进,使其更健壮、更易于维护和扩展:

  1. 抽象数据库操作: 将数据库操作 (查询、更新等) 封装成独立的函数或类,提高代码复用性和可测试性。
  2. 使用上下文管理器: 使用 with 语句管理数据库会话 (Session),确保资源正确释放。
  3. 参数化查询: 使用参数化查询,防止 SQL 注入攻击。
  4. 更灵活的重试机制: 使用装饰器实现更通用的重试逻辑,可以自定义重试次数、延迟时间、异常类型等。
  5. 单元测试: 为核心函数编写单元测试,确保代码质量。
  6. 类型提示: 使用类型提示 (Type Hints),提高代码可读性和可维护性。
  7. 更完善的错误处理: 将自定义异常与标准异常结合使用,提供更清晰的错误信息。

1. 数据库操作抽象 (db_operations.py):

from sqlalchemy.orm import Session
from sqlalchemy.exc import OperationalError, TimeoutError
from typing import Optional, List, Type, Callable, Any
import time
import logging# --- 自定义异常 ---
class InsufficientBalanceError(Exception):"""余额不足异常"""passclass RecordNotFoundError(Exception):"""记录未找到异常"""pass# --- 重试装饰器 ---
def retry_on_failure(max_retries: int = 3,delay: float = 0.5,retry_on_exceptions: tuple = (OperationalError, TimeoutError),logger: Optional[logging.Logger] = None
):"""重试装饰器,用于数据库操作。Args:max_retries: 最大重试次数。delay: 重试延迟时间 (秒)。retry_on_exceptions: 需要重试的异常类型。logger: 日志记录器 (可选)。"""def decorator(func: Callable):def wrapper(*args: Any, **kwargs: Any):for attempt in range(max_retries):try:return func(*args, **kwargs)except retry_on_exceptions as e:if "deadlock" in str(e).lower() and isinstance(e, OperationalError):log_msg = "Deadlock detected"else:log_msg = f"{type(e).__name__} occurred"if attempt == max_retries - 1:if logger:logger.error(f"{log_msg}, giving up after {max_retries} attempts.")raise  # 达到最大重试次数,抛出异常if logger:logger.warning(f"{log_msg}, retrying in {delay} seconds... (attempt {attempt + 1}/{max_retries})")time.sleep(delay)return wrapperreturn decorator# --- 数据库操作函数 ---@retry_on_failure()
def get_order_and_inventory(session: Session, order_id: int, item_id: int) -> tuple: #type: ignore"""获取订单和库存,并加锁。"""# 按照 item_id 的顺序获取锁 (避免死锁的关键)if item_id < order_id:inventory = session.query(Inventory).filter(Inventory.item_id == item_id).with_for_update().first()if inventory is None:raise RecordNotFoundError(f"Inventory with item_id {item_id} not found")order = session.query(Order).filter(Order.id == order_id).with_for_update().first()if order is None:raise RecordNotFoundError(f"Order with id {order_id} not found")else:order = session.query(Order).filter(Order.id == order_id).with_for_update().first()if order is None:raise RecordNotFoundError(f"Order with id {order_id} not found")inventory = session.query(Inventory).filter(Inventory.item_id == item_id).with_for_update().first()if inventory is None:raise RecordNotFoundError(f"Inventory with item_id {item_id} not found")return order, inventory@retry_on_failure()
def get_user(session: Session, user_id: int) -> "User": #type: ignore"""获取用户"""user = session.query(User).filter(User.id == user_id).first()if user is None:raise RecordNotFoundError(f"User with id {user_id} not found")return user@retry_on_failure()
def update_order_status(session: Session, order: "Order", status: str) -> None: #type: ignore"""更新订单状态"""order.status = status@retry_on_failure()
def update_inventory_quantity(session: Session, inventory: "Inventory", quantity_change: int) -> None:  #type: ignore"""更新库存数量"""inventory.quantity += quantity_change@retry_on_failure()
def update_user_balance(session: Session, user: "User", balance_change: float) -> None:  #type: ignore"""更新用户余额"""user.balance += balance_change@retry_on_failure()
def add_operation_log(session: Session, transaction_id: str, message: str) -> None: #type: ignore"""添加操作日志"""from models import OperationLog  # 避免循环导入session.add(OperationLog(transaction_id=transaction_id, message=message))# --- 示例:使用参数化查询 (防止 SQL 注入) ---
@retry_on_failure()
def get_orders_by_user(session: Session, user_id: int) -> List["Order"]:  #type: ignore"""根据用户 ID 获取订单 (参数化查询)"""return session.query(Order).filter(Order.user_id == user_id).all()

2. 模型定义 (models.py):

# models.py
from sqlalchemy import create_engine, Column, Integer, String, Numeric
from sqlalchemy.orm import declarative_base
import timeBase = declarative_base()class Order(Base):__tablename__ = 'orders'id = Column(Integer, primary_key=True)user_id = Column(Integer)item_id = Column(Integer)status = Column(String)amount = Column(Numeric(10, 2))class Inventory(Base):__tablename__ = 'inventory'item_id = Column(Integer, primary_key=True)quantity = Column(Integer)class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)balance = Column(Numeric(10, 2))class OperationLog(Base):__tablename__ = 'operation_logs'id = Column(Integer, primary_key=True)transaction_id = Column(String)message = Column(String)timestamp = Column(Integer, default=lambda: int(time.time()))

3. 主程序 (deadlock_demo_final.py):

import threading
import time
import uuid
import logging
import logging.config
import configparserfrom sqlalchemy.orm import sessionmaker
from db_operations import (get_order_and_inventory,get_user,update_order_status,update_inventory_quantity,update_user_balance,add_operation_log,InsufficientBalanceError,RecordNotFoundError
)
from models import Base, Order, Inventory,User # 导入所有模型
from sqlalchemy import create_engine# --- 读取配置 ---
config = configparser.ConfigParser()
config.read('config.ini')# --- 日志配置 ---
logging.basicConfig(level=logging.getLevelName(config['logging']['level']),format='%(asctime)s [%(threadName)s] [%(levelname)s] [%(transaction_id)s] - %(message)s',handlers=[logging.FileHandler(config['logging']['file']),logging.StreamHandler()  # 同时输出到控制台]
)# --- 全局变量 ---
engine = None  # 稍后初始化
Session = None# --- 初始化数据库引擎 ---
def init_db():global engine, Sessionengine = create_engine(config['database']['engine'], pool_recycle=3600, echo=False)Session = sessionmaker(bind=engine)Base.metadata.create_all(engine)# --- 初始化数据 (可选) ---
def init_data():with session_scope() as session:try:# 检查数据是否已存在if not session.query(Order).first():session.add(Order(id=1, user_id=1, item_id=101, status='pending', amount=10.00))session.add(Order(id=2, user_id=2, item_id=102, status='pending', amount=25.00))session.add(Inventory(item_id=101, quantity=10))session.add(Inventory(item_id=102, quantity=5))session.add(User(id=1, balance=100.00))session.add(User(id=2, balance=50.00))session.commit()logging.info("Initialized database with sample data.")except Exception as e:logging.exception("Error initializing data: %s", e)session.rollback()# --- 上下文管理器 (用于管理 Session) ---
from contextlib import contextmanager@contextmanager
def session_scope():"""Provide a transactional scope around a series of operations."""session = Session()try:yield sessionsession.commit()except:session.rollback()raisefinally:session.close()# --- 核心更新函数 ---
def update_order_and_inventory(order_id: int, item_id: int, logger: logging.Logger):"""更新订单和库存 (修复死锁版本,使用上下文管理器)。"""transaction_id = str(uuid.uuid4())logger = logger.getChild(f"[{threading.current_thread().name}]")extra = {'transaction_id': transaction_id}logger = logging.LoggerAdapter(logger, extra)with session_scope() as session:try:order, inventory = get_order_and_inventory(session, order_id, item_id)# 检查用户余额user = get_user(session, order.user_id)if user.balance < order.amount:raise InsufficientBalanceError("Insufficient balance")# 更新订单状态update_order_status(session, order, 'paid')logger.info(f"Updated order status to 'paid' for order {order.id}")# 更新库存update_inventory_quantity(session, inventory, -1)logger.info(f"Decreased inventory for item {inventory.item_id} by 1")# 扣减用户余额update_user_balance(session, user, -order.amount)logger.info(f"Deducted {order.amount} from user {user.id}'s balance")# 记录操作日志add_operation_log(session, transaction_id, f"Order {order.id} paid, inventory updated.")logger.info(f"Recorded operation log for transaction {transaction_id}")time.sleep(0.05)except InsufficientBalanceError as e:logger.error(f"Business logic error: {e}")except RecordNotFoundError as e:logger.error(f"Record not found error: {e}")except Exception as e:logger.exception(f"Unexpected error: {e}")# --- 模拟事务 (线程函数) ---
def transaction_task(order_id: int, item_id: int):logger = logging.getLogger(__name__)update_order_and_inventory(order_id, item_id, logger)# --- 主程序 ---
if __name__ == "__main__":init_db()init_data()# 创建并启动线程t1 = threading.Thread(target=transaction_task, args=(1, 101), name="Thread-A")t2 = threading.Thread(target=transaction_task, args=(1, 101), name="Thread-B")t1.start()t2.start()t1.join()t2.join()print("Done")

4. 单元测试 (test_db_operations.py) (可选):

import unittest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base, Order, Inventory, User
from db_operations import (get_order_and_inventory,get_user,update_order_status,update_inventory_quantity,update_user_balance,add_operation_log,InsufficientBalanceError,RecordNotFoundError,get_orders_by_user # 示例
)
import uuid
import timeclass TestDBOperations(unittest.TestCase):@classmethoddef setUpClass(cls):# 使用内存数据库进行测试cls.engine = create_engine('sqlite:///:memory:')Base.metadata.create_all(cls.engine)cls.Session = sessionmaker(bind=cls.engine)# 插入测试数据session = cls.Session()user1 = User(id=1, balance=100.00)user2 = User(id=2, balance=50.00)order1 = Order(id=1, user_id=1, item_id=101, status='pending', amount=10.00)order2 = Order(id=2, user_id=2, item_id=102, status='pending', amount=25.00)inventory1 = Inventory(item_id=101, quantity=10)inventory2 = Inventory(item_id=102, quantity=5)session.add_all([user1, user2, order1, order2, inventory1, inventory2])session.commit()session.close()def setUp(self):self.session = self.Session()def tearDown(self):self.session.close()def test_get_order_and_inventory(self):order, inventory = get_order_and_inventory(self.session, 1, 101)self.assertEqual(order.id, 1)self.assertEqual(inventory.item_id, 101)def test_get_order_and_inventory_record_not_found(self):with self.assertRaises(RecordNotFoundError):get_order_and_inventory(self.session, 999, 101)  # 不存在的订单IDwith self.assertRaises(RecordNotFoundError):get_order_and_inventory(self.session, 1, 999)  # 不存在的物品IDdef test_get_user(self):user = get_user(self.session, 1)self.assertEqual(user.id, 1)self.assertEqual(user.balance, 100.00)def test_get_user_not_found(self):with self.assertRaises(RecordNotFoundError):get_user(self.session, 999) # 不存在的用户IDdef test_update_order_status(self):order, _ = get_order_and_inventory(self.session, 1, 101)update_order_status(self.session, order, 'shipped')self.session.commit()updated_order, _ = get_order_and_inventory(self.session, 1, 101)self.assertEqual(updated_order.status, 'shipped')def test_update_inventory_quantity(self):_, inventory = get_order_and_inventory(self.session, 1, 101)update_inventory_quantity(self.session, inventory, 5)self.session.commit()_, updated_inventory = get_order_and_inventory(self.session, 1, 101)self.assertEqual(updated_inventory.quantity, 15)  # 10 + 5def test_update_user_balance(self):user = get_user(self.session, 1)update_user_balance(self.session, user, -20.00)self.session.commit()updated_user = get_user(self.session, 1)self.assertEqual(updated_user.balance, 80.00)def test_add_operation_log(self):transaction_id = str(uuid.uuid4())message = "Test operation log"add_operation_log(self.session, transaction_id, message)self.session.commit()# 验证日志是否已添加 (这里简化了验证逻辑)log_added = Trueself.assertTrue(log_added)# 示例: 测试参数化查询def test_get_orders_by_user(self):orders = get_orders_by_user(self.session, 1)self.assertEqual(len(orders), 1)self.assertEqual(orders[0].user_id, 1)orders = get_orders_by_user(self.session, 2)self.assertEqual(len(orders), 1)self.assertEqual(orders[0].user_id, 2)@classmethoddef tearDownClass(cls):# 清理数据库Base.metadata.drop_all(cls.engine)if __name__ == '__main__':unittest.main()

主要改进点:

  • 结构:
    • 将代码拆分到了三个文件:db_operations.py (数据库操作), models.py (模型定义), deadlock_demo_final.py (主程序)。
    • db_operations.py 中定义了独立的数据库操作函数,并使用了重试装饰器。
  • 上下文管理器:
    • deadlock_demo_final.py 中定义了 session_scope 上下文管理器,用于自动管理数据库会话的创建、提交和回滚。
  • 重试装饰器:
    • db_operations.py 中定义了 retry_on_failure 装饰器,可以方便地为数据库操作添加重试逻辑。
  • 参数化查询:
    • db_operations.py 中的 get_orders_by_user 函数演示了如何使用参数化查询。
  • 类型提示:
    • 代码中使用了类型提示 (Type Hints),例如 def get_user(session: Session, user_id: int) -> User:
  • 自定义异常:
    • db_operations.py 中定义了 InsufficientBalanceErrorRecordNotFoundError 异常。
  • 单元测试:
    • 创建了 test_db_operations.py 文件,其中包含了一些针对数据库操作函数的单元测试。

如何运行:

  1. 安装依赖: pip install sqlalchemy pymysql
  2. 运行单元测试: python -m unittest test_db_operations.py
  3. 运行主程序: python deadlock_demo_final.py (确保已正确配置 config.ini)

这个最终版本在结构、健壮性、可维护性和安全性方面都有了显著提升。在实际项目中,你还可以根据需要继续完善,例如添加更全面的单元测试、集成测试、性能测试等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/897813.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

002Axios网络请求的封装

在日常应用过程中,一个项目中的网络请求会很多,此时一般采取的方案是将网络请求封装起来 创建项目npm create vite@latest 起个名字axiospro02 选择 选择 进入文件夹 打开看一下没问题 这个样式删掉 这句话删掉 这页这样写 这页这样写 1、安装 axios:npm install axios;…

【PHP攻防】带有 LFI 和 SSH 日志中毒的 RCE

免责声明 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等(包括但不限于)进行检测或维护参考,未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失,均由使用者本人负责。本文所提供的工具仅用…

【最佳方案】RAG 技术深度剖析及 MaxKB 在企业 AI 落地中的应用策略

RAG 已经成为 LLM 大语言模型在企业落地的最佳方案,其中主要是因为 RAG 能够解决幻觉问题、时效性问题以及数据安全问题。解决幻觉问题:LLM 文本生成的底层原理是基于概率的 token by token 的形式,因此会不可避免地产生“一本正经的胡说八道"的情况。比如:你说,”博…

开源中国完成数亿元 C 轮融资:Gitee 加速智能化研发效能革新

开源中国近日宣布完成数亿元C轮融资,由北京信息产业发展投资基金领投,深报一本及上河动量跟投。本轮资金将用于加速Gitee在AI DevSecOps领域的创新,构建智能化研发效能解决方案,提升企业级用户的研发效率与安全性。目前,Gitee已服务36万企业用户,在金融、能源、政府等核心…

【ArcGIS】从数据导入到开始标注

1.点击 新建工程下面的地图2.找一个合适的位置新建项目3.右键地图,点击添加数据找到下载好的数据后点确认4.打开后图像会自动和地图对准,不用管5.下载群里的中印.ecs,在桌面找一个文件夹保存(不然可能在软件里找不到) 点击这里的浏览至现有方案,找到ecs文件会出现下面这五…

Windows 提权-内核利用_2

本文通过 Google 翻译 Kernel Exploits Part 2 – Windows Privilege Escalation 这篇文章所产生,本人仅是对机器翻译中部分表达别扭的字词进行了校正及个别注释补充。导航0 前言 1 新版 Windows 系统内核利用 2 搜寻内核漏洞2.1 枚举内核利用 - 手动 2.2 枚举内核利用 - 自动…

90%的开发者都忽略的文本向量化技巧!

1 啥是文本张量表示? 将一段文本使用张量表示,一般将词汇表示成向量,称作词向量,再由各个词向量按序组成矩阵形成文本表示,如: ["人生", "该", "如何", "起头"]==># 每个词对应矩阵中的一个向量 [[1.32, 4,32, 0,32, 5.2],[3…

北京大学!121页,讲透DeepSeek的私有化部署!(免费下载)

北京大学肖睿团队发布的《DeepSeek私有化部署技术白皮书》,系统揭示了国产大模型从实验室走向产业落地的完整路径。这份文档的核心价值,在于打破“私有化部署=高性能硬件堆砌”的固有认知,提出一套覆盖个人电脑、边缘设备到企业级集群的弹性部署体系。北京大学肖睿团队发布的…

网易伏羲人工智能实验室技术创新再突破!前馈捏脸等3篇论文入选CVPR 2025

近日,全球计算机视觉和模式识别领域的顶级会议CVPR (Conference on Computer Vision and Pattern Recognition)公布论文接收结果:网易伏羲人工智能实验室凭借其在前馈捏脸等领域的创新研究,成功入选3篇论文。CVPR 是计算机视觉和模式识别领域最顶级的学术会议之一,至今已…

html的基础操作2

1、表格标签(table语句) (1)认识表中的一些常用单词 border 边距 align 格式 ‘ center’ 对齐 cellspacing 单元格与单元格的距离 cellpadding 单元格与内容的距离 wedth 宽度 height 高度 tr 表示:行 th 表示:表头 td :表示列 -表格的案例--代码- <!DOCTYPE…

MySQL 去除 \n 斜杠n,\\n,\\\\n,mysql去除换行符

MySQL 去除 \n 斜杠n,\\n,\\\\n,mysql去除换行符 使用navicat查询数据可以看到字符\n,(这里的\n并不是换行符,而是字符,换行符是不可以看到的) 在查询数据的时候需要转义才能查询到(查询时斜杠的数量是实际数量的4倍,不然会查询异常)-- 错误写法 SELECT field FROM…