🏗️ 整体架构设计

🌐
商户接入层
API Gateway
⚙️
支付核心引擎
路由/风控/账务
🏦
银行通道层
通道适配
📊 账务中心 🔒 安全中心 📈 数据中心

💡 核心设计原则

高可用性

99.99% SLA 保障,多活容灾设计,故障自动切换

数据一致性

分布式事务保证,最终一致性,对账机制兜底

安全合规

PCI DSS 认证,数据加密存储,敏感信息脱敏

可扩展性

微服务架构,支持水平扩容,新通道快速接入

📦 核心模块设计

1️⃣ 订单中心

负责订单的创建、状态管理、生命周期控制


# 订单表设计
CREATE TABLE payment_order (
    id              BIGINT PRIMARY KEY,
    order_no        VARCHAR(32) UNIQUE NOT NULL,  商户订单号
    pay_order_no    VARCHAR(32) UNIQUE,           支付订单号
    merchant_id     VARCHAR(32) NOT NULL,         商户 ID
    amount          DECIMAL(12,2) NOT NULL,       订单金额
    currency        VARCHAR(3) DEFAULT 'CNY',    币种
    status          TINYINT NOT NULL,             订单状态
    channel         VARCHAR(32),                  支付通道
    create_time     DATETIME NOT NULL,
    update_time     DATETIME NOT NULL,
    expire_time     DATETIME,                     过期时间
    INDEX idx_merchant (merchant_id, create_time),
    INDEX idx_status (status, update_time)
);
                    

2️⃣ 支付路由

智能选择最优支付通道,支持多策略路由

📥
请求接入
🎯
规则匹配
🔄
通道选择
📤
路由下发

3️⃣ 通道适配层

统一不同银行/支付机构的接口差异


# 通道适配器接口定义
class PaymentChannelAdapter:
    def create_order(self, request) -> OrderResult:
        """创建支付订单"""
        pass
    
    def query_order(self, order_no) -> QueryResult:
        """查询订单状态"""
        pass
    
    def refund(self, refund_request) -> RefundResult:
        """退款处理"""
        pass
    
    def verify_callback(self, callback_data) -> bool:
        """验证回调签名"""
        pass

# 支付宝适配器实现
class AlipayAdapter(PaymentChannelAdapter):
    def create_order(self, request):
        # 调用支付宝 SDK
        return alipay_client.trade_create(request)
    
    def verify_callback(self, callback_data):
        # 验证支付宝签名
        return alipay_client.verify(callback_data)
                    

4️⃣ 风控中心

实时风险识别与拦截

🛡️ 风控规则引擎
  • 黑名单检查:用户/商户/设备黑名单
  • 限额控制:单笔/单日/单月限额
  • 频次控制:单位时间内交易次数
  • 异常检测:异地支付、大额异常
  • 智能评分:机器学习模型实时评分

5️⃣ 对账中心

确保资金安全,发现并处理差异

📥
获取账单
⚖️
数据比对
⚠️
差异处理
平账完成

🗄️ 核心表设计

支付流水表


CREATE TABLE payment_flow (
    id              BIGINT PRIMARY KEY,
    flow_no         VARCHAR(32) UNIQUE NOT NULL,  流水号
    order_no        VARCHAR(32) NOT NULL,         关联订单号
    merchant_id     VARCHAR(32) NOT NULL,
    channel         VARCHAR(32) NOT NULL,         支付通道
    channel_order_no VARCHAR(64),                 通道订单号
    amount          DECIMAL(12,2) NOT NULL,
    fee             DECIMAL(10,2) DEFAULT 0,      手续费
    status          TINYINT NOT NULL,             SUCCESS/FAIL/PROCESSING
    error_code      VARCHAR(32),                  错误码
    error_msg       VARCHAR(512),                 错误信息
    request_data    TEXT,                         请求报文
    response_data   TEXT,                         响应报文
    create_time     DATETIME NOT NULL,
    complete_time   DATETIME,
    INDEX idx_order (order_no),
    INDEX idx_channel (channel, create_time)
);
                    

商户配置表


CREATE TABLE merchant_config (
    id              BIGINT PRIMARY KEY,
    merchant_id     VARCHAR(32) UNIQUE NOT NULL,
    merchant_name   VARCHAR(128) NOT NULL,
    api_key         VARCHAR(64) UNIQUE NOT NULL,  API 密钥
    secret_key      VARCHAR(128) NOT NULL,        签名密钥
    notify_url      VARCHAR(256),                 回调地址
    status          TINYINT NOT NULL,             启用/禁用
    rate            DECIMAL(5,4) DEFAULT 0.006,   费率
    settle_mode     TINYINT DEFAULT 1,            结算模式
    create_time     DATETIME NOT NULL,
    update_time     DATETIME NOT NULL
);
                    

通道配置表


CREATE TABLE channel_config (
    id              BIGINT PRIMARY KEY,
    channel_code    VARCHAR(32) UNIQUE NOT NULL,  通道编码
    channel_name    VARCHAR(64) NOT NULL,
    provider        VARCHAR(64) NOT NULL,         提供方
    config_data     JSON NOT NULL,                通道配置
    status          TINYINT NOT NULL,             启用/禁用
    weight          INT DEFAULT 100,              路由权重
    daily_limit     DECIMAL(15,2),                日限额
    single_limit    DECIMAL(12,2),                单笔限额
    support_methods JSON,                         支持的支付方式
    create_time     DATETIME NOT NULL
);
                    

📡 核心 API 设计

🌐 统一支付接口

# 请求示例
POST /api/v1/payment/pay
Content-Type: application/json

{
    "merchant_id": "M20240101001",
    "order_no": "ORD20240101123456",
    "amount": "100.00",
    "currency": "CNY",
    "subject": "商品描述",
    "body": "商品详情",
    "notify_url": "https://merchant.com/notify",
    "return_url": "https://merchant.com/return",
    "pay_method": "ALIPAY",
    "timestamp": "1704067200",
    "sign": "abc123..."
}

# 响应示例
{
    "code": "SUCCESS",
    "message": "success",
    "data": {
        "pay_order_no": "P20240101123456",
        "pay_url": "https://pay.gateway.com/h5/xxx",
        "qr_code": "data:image/png;base64,..."
    }
}
                    

🔥🔥🔥 核心难点与解决方案

🔄 1. 分布式事务一致性

⚠️ 问题描述

支付涉及多个服务:订单服务、账户服务、通道服务,如何保证数据一致性?

✅ 解决方案:TCC + 本地消息表

# TCC 三阶段实现
class PaymentService:
    
    # 阶段 1: Try - 资源预留
    def try_payment(self, order_id, amount):
        # 1. 冻结用户账户余额
        account_service.freeze_balance(order_id, amount)
        # 2. 创建支付记录(状态:TRYING)
        payment_record.create(status="TRYING")
        return True
    
    # 阶段 2: Confirm - 确认提交
    def confirm_payment(self, order_id):
        # 1. 扣减已冻结的余额
        account_service.deduct_frozen(order_id)
        # 2. 更新支付记录(状态:SUCCESS)
        payment_record.update(order_id, status="SUCCESS")
    
    # 阶段 3: Cancel - 回滚取消
    def cancel_payment(self, order_id):
        # 1. 解冻账户余额
        account_service.unfreeze_balance(order_id)
        # 2. 更新支付记录(状态:CANCEL)
        payment_record.update(order_id, status="CANCEL")
                        
💡 核心:Try 阶段预留资源,Confirm/Cancel 幂等执行,支持自动重试

🔁 2. 接口幂等性设计

⚠️ 问题描述

网络超时导致商户重试、回调重复通知,如何防止重复扣款?

✅ 解决方案:唯一幂等键 + 数据库唯一索引
1
生成幂等键
merchant_id + order_no
2
Redis 预检查
SETNX 幂等键
3
数据库约束
UNIQUE 索引

# 幂等性实现代码
def create_payment(self, request):
    # 生成幂等键
    idempotent_key = f"idem:{request.merchant_id}:{request.order_no}"
    
    # Redis 原子操作获取锁
    acquired = redis.set(idempotent_key, "1", nx=True, ex=3600)
    if not acquired:
        # 已存在,返回之前的结果(幂等)
        return self.get_previous_result(request.order_no)
    
    try:
        # 数据库唯一索引兜底
        payment = Payment.objects.create(
            merchant_id=request.merchant_id,
            order_no=request.order_no,
            amount=request.amount
        )
        return self.process_payment(payment)
    except IntegrityError:
        # 唯一索引冲突,返回已存在的订单
        return self.get_existing_payment(request.order_no)
    finally:
        # 释放锁(异步任务处理)
        pass
                        
⚠️ 关键:Redis 和数据库双重保障,防止并发重复请求

🔀 3. 支付通道故障自动切换

⚠️ 问题描述

某个银行通道宕机或响应超时,如何快速切换到备用通道,保证支付不中断?

✅ 解决方案:熔断器 + 智能路由
📊
实时监控
成功率/响应时间
熔断触发
失败率>50%
🔄
自动切换
路由到备用通道
📈
半开探测
定时恢复测试

# 熔断器实现
class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.state = "CLOSED"  # CLOSED/OPEN/HALF_OPEN
        self.last_failure_time = None
        self.recovery_timeout = recovery_timeout
    
    def execute(self, channel, payment_func):
        if self.state == "OPEN":
            # 检查是否可以进入半开状态
            if time.now() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
            else:
                raise CircuitBreakerOpenError("通道已熔断")
        
        try:
            result = payment_func()
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"  # 成功,恢复通道
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.now()
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"  # 失败过多,打开熔断
            raise e

# 智能路由选择
def select_channel(payment_request):
    channels = get_available_channels(payment_request)
    # 按优先级排序:权重 > 成功率 > 响应时间
    sorted_channels = sorted(
        channels, 
        key=lambda c: (c.weight, c.success_rate, -c.avg_response_time),
        reverse=True
    )
    for channel in sorted_channels:
        if channel.circuit_breaker.state != "OPEN":
            return channel
    raise NoAvailableChannelError("所有通道不可用")
                        
💡 核心:熔断器三态转换(CLOSED→OPEN→HALF_OPEN),自动恢复机制

📢 4. 回调通知可靠性保证

⚠️ 问题描述

商户服务器宕机、网络异常导致回调失败,如何确保商户最终收到通知?

✅ 解决方案:可靠消息队列 + 指数退避重试

# 回调通知重试机制
class NotifyService:
    # 重试策略:指数退避
    RETRY_DELAYS = [1, 5, 30, 120, 600, 1800]  # 秒
    MAX_RETRIES = 6
    
    def send_notify(self, order):
        notify_record = NotifyRecord.create(
            order_id=order.id,
            notify_url=order.notify_url,
            status="PENDING"
        )
        # 发送到消息队列
        mq.send("payment.notify", {
            "order_id": order.id,
            "notify_record_id": notify_record.id
        })
    
    def process_notify(self, message):
        notify_record = NotifyRecord.get(message.notify_record_id)
        
        for attempt in range(1, self.MAX_RETRIES + 1):
            try:
                response = http.post(
                    notify_record.notify_url,
                    data=self.build_notify_data(notify_record),
                    timeout=10
                )
                if response.status_code == 200:
                    notify_record.update(status="SUCCESS")
                    return
            except Exception as e:
                # 记录失败日志
                notify_record.add_log(f"第{attempt}次失败:{e}")
            
            # 等待下次重试(指数退避)
            if attempt < self.MAX_RETRIES:
                delay = self.RETRY_DELAYS[attempt - 1]
                # 加入随机抖动,避免雪崩
                jitter = random.uniform(0, delay * 0.2)
                time.sleep(delay + jitter)
        
        # 所有重试失败,标记为失败并告警
        notify_record.update(status="FAILED")
        alert_service.send(f"回调通知失败:{notify_record.order_id}")
                        
📊 重试策略:1s → 5s → 30s → 2min → 10min → 30min,总计约 45 分钟

5. 高并发性能优化

⚠️ 问题描述

大促期间 QPS 暴增 10 倍,如何保证系统不崩溃、响应不超时?

✅ 解决方案:多层缓存 + 异步化 + 限流降级
优化手段 具体实现 效果
多级缓存本地缓存 (Caffeine) + Redis热点数据访问 < 1ms
异步处理非核心逻辑异步化(通知、记账)主链路 RT 降低 60%
限流熔断令牌桶限流 + 自适应熔断防止雪崩,保护系统
数据库优化读写分离 + 分库分表支持 10 万 + TPS
连接池优化HikariCP + Redis 连接池减少连接创建开销
批量处理批量查询/写入,减少 IO 次数吞吐量提升 3 倍

# 多级缓存实现
class CacheService:
    def get_merchant_config(self, merchant_id):
        # 1. 本地缓存(L1)
        config = local_cache.get(f"merchant:{merchant_id}")
        if config:
            return config
        
        # 2. Redis 缓存(L2)
        config = redis.get(f"merchant:{merchant_id}")
        if config:
            local_cache.set(f"merchant:{merchant_id}", config, ttl=60)
            return config
        
        # 3. 数据库查询
        config = db.query("SELECT * FROM merchant_config WHERE merchant_id = ?", merchant_id)
        if config:
            redis.set(f"merchant:{merchant_id}", config, ex=300)
            local_cache.set(f"merchant:{merchant_id}", config, ttl=60)
        return config

# 限流实现(令牌桶)
class RateLimiter:
    def __init__(self, capacity, refill_rate):
        self.capacity = capacity  # 桶容量
        self.tokens = capacity
        self.refill_rate = refill_rate  # 每秒补充令牌数
        self.last_refill = time.now()
    
    def try_acquire(self):
        # 补充令牌
        now = time.now()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now
        
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False  # 限流
                        
💡 核心:80% 流量走缓存,20% 核心逻辑走数据库,异步处理非核心业务

🛠️ 技术栈推荐

层次 技术选型 说明
接入层Nginx + Kong负载均衡、限流、鉴权
网关层Spring Cloud Gateway路由、熔断、降级
服务层Spring Boot + Dubbo微服务架构
数据库MySQL + Redis关系型 + 缓存
消息队列RocketMQ / Kafka异步解耦、削峰
监控Prometheus + Grafana指标监控、告警
链路追踪SkyWalking全链路追踪

🚀 部署架构

☁️
可用区 A
主活
🔄
双活同步
数据实时复制
☁️
可用区 B
备活
📊 多活容灾 ⚡ 故障切换 < 30s 💾 数据零丢失