📦 核心模块设计
1️⃣ 订单中心
负责订单的创建、状态管理、生命周期控制
CREATE TABLE payment_order (
id BIGINT PRIMARY KEY,
order_no VARCHAR(32) UNIQUE NOT NULL,
pay_order_no VARCHAR(32) UNIQUE,
merchant_id VARCHAR(32) NOT NULL,
amount DECIMAL(12,2) NOT NULL,
currency VARCHAR(3) DEFAULT 'CNY',
status TINYINT NOT NULL,
channel VARCHAR(32),
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
expire_time DATETIME,
INDEX idx_merchant (merchant_id, create_time),
INDEX idx_status (status, update_time)
);
2️⃣ 支付路由
智能选择最优支付通道,支持多策略路由
3️⃣ 通道适配层
统一不同银行/支付机构的接口差异
class PaymentChannelAdapter:
def create_order(self, request) -> OrderResult:
"""创建支付订单"""
pass
def query_order(self, order_no) -> QueryResult:
"""查询订单状态"""
pass
def refund(self, refund_request) -> RefundResult:
"""退款处理"""
pass
def verify_callback(self, callback_data) -> bool:
"""验证回调签名"""
pass
class AlipayAdapter(PaymentChannelAdapter):
def create_order(self, request):
return alipay_client.trade_create(request)
def verify_callback(self, callback_data):
return alipay_client.verify(callback_data)
4️⃣ 风控中心
实时风险识别与拦截
🛡️ 风控规则引擎
- • 黑名单检查:用户/商户/设备黑名单
- • 限额控制:单笔/单日/单月限额
- • 频次控制:单位时间内交易次数
- • 异常检测:异地支付、大额异常
- • 智能评分:机器学习模型实时评分
5️⃣ 对账中心
确保资金安全,发现并处理差异
🗄️ 核心表设计
支付流水表
CREATE TABLE payment_flow (
id BIGINT PRIMARY KEY,
flow_no VARCHAR(32) UNIQUE NOT NULL,
order_no VARCHAR(32) NOT NULL,
merchant_id VARCHAR(32) NOT NULL,
channel VARCHAR(32) NOT NULL,
channel_order_no VARCHAR(64),
amount DECIMAL(12,2) NOT NULL,
fee DECIMAL(10,2) DEFAULT 0,
status TINYINT NOT NULL,
error_code VARCHAR(32),
error_msg VARCHAR(512),
request_data TEXT,
response_data TEXT,
create_time DATETIME NOT NULL,
complete_time DATETIME,
INDEX idx_order (order_no),
INDEX idx_channel (channel, create_time)
);
商户配置表
CREATE TABLE merchant_config (
id BIGINT PRIMARY KEY,
merchant_id VARCHAR(32) UNIQUE NOT NULL,
merchant_name VARCHAR(128) NOT NULL,
api_key VARCHAR(64) UNIQUE NOT NULL,
secret_key VARCHAR(128) NOT NULL,
notify_url VARCHAR(256),
status TINYINT NOT NULL,
rate DECIMAL(5,4) DEFAULT 0.006,
settle_mode TINYINT DEFAULT 1,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL
);
通道配置表
CREATE TABLE channel_config (
id BIGINT PRIMARY KEY,
channel_code VARCHAR(32) UNIQUE NOT NULL,
channel_name VARCHAR(64) NOT NULL,
provider VARCHAR(64) NOT NULL,
config_data JSON NOT NULL,
status TINYINT NOT NULL,
weight INT DEFAULT 100,
daily_limit DECIMAL(15,2),
single_limit DECIMAL(12,2),
support_methods JSON,
create_time DATETIME NOT NULL
);
📡 核心 API 设计
POST /api/v1/payment/pay
Content-Type: application/json
{
"merchant_id": "M20240101001",
"order_no": "ORD20240101123456",
"amount": "100.00",
"currency": "CNY",
"subject": "商品描述",
"body": "商品详情",
"notify_url": "https://merchant.com/notify",
"return_url": "https://merchant.com/return",
"pay_method": "ALIPAY",
"timestamp": "1704067200",
"sign": "abc123..."
}
{
"code": "SUCCESS",
"message": "success",
"data": {
"pay_order_no": "P20240101123456",
"pay_url": "https://pay.gateway.com/h5/xxx",
"qr_code": "data:image/png;base64,..."
}
}
🔥🔥🔥 核心难点与解决方案
🔄 1. 分布式事务一致性
⚠️ 问题描述
支付涉及多个服务:订单服务、账户服务、通道服务,如何保证数据一致性?
class PaymentService:
def try_payment(self, order_id, amount):
account_service.freeze_balance(order_id, amount)
payment_record.create(status="TRYING")
return True
def confirm_payment(self, order_id):
account_service.deduct_frozen(order_id)
payment_record.update(order_id, status="SUCCESS")
def cancel_payment(self, order_id):
account_service.unfreeze_balance(order_id)
payment_record.update(order_id, status="CANCEL")
💡 核心:Try 阶段预留资源,Confirm/Cancel 幂等执行,支持自动重试
🔁 2. 接口幂等性设计
⚠️ 问题描述
网络超时导致商户重试、回调重复通知,如何防止重复扣款?
def create_payment(self, request):
idempotent_key = f"idem:{request.merchant_id}:{request.order_no}"
acquired = redis.set(idempotent_key, "1", nx=True, ex=3600)
if not acquired:
return self.get_previous_result(request.order_no)
try:
payment = Payment.objects.create(
merchant_id=request.merchant_id,
order_no=request.order_no,
amount=request.amount
)
return self.process_payment(payment)
except IntegrityError:
return self.get_existing_payment(request.order_no)
finally:
pass
⚠️ 关键:Redis 和数据库双重保障,防止并发重复请求
🔀 3. 支付通道故障自动切换
⚠️ 问题描述
某个银行通道宕机或响应超时,如何快速切换到备用通道,保证支付不中断?
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.state = "CLOSED"
self.last_failure_time = None
self.recovery_timeout = recovery_timeout
def execute(self, channel, payment_func):
if self.state == "OPEN":
if time.now() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise CircuitBreakerOpenError("通道已熔断")
try:
result = payment_func()
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.now()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise e
def select_channel(payment_request):
channels = get_available_channels(payment_request)
sorted_channels = sorted(
channels,
key=lambda c: (c.weight, c.success_rate, -c.avg_response_time),
reverse=True
)
for channel in sorted_channels:
if channel.circuit_breaker.state != "OPEN":
return channel
raise NoAvailableChannelError("所有通道不可用")
💡 核心:熔断器三态转换(CLOSED→OPEN→HALF_OPEN),自动恢复机制
📢 4. 回调通知可靠性保证
⚠️ 问题描述
商户服务器宕机、网络异常导致回调失败,如何确保商户最终收到通知?
class NotifyService:
RETRY_DELAYS = [1, 5, 30, 120, 600, 1800]
MAX_RETRIES = 6
def send_notify(self, order):
notify_record = NotifyRecord.create(
order_id=order.id,
notify_url=order.notify_url,
status="PENDING"
)
mq.send("payment.notify", {
"order_id": order.id,
"notify_record_id": notify_record.id
})
def process_notify(self, message):
notify_record = NotifyRecord.get(message.notify_record_id)
for attempt in range(1, self.MAX_RETRIES + 1):
try:
response = http.post(
notify_record.notify_url,
data=self.build_notify_data(notify_record),
timeout=10
)
if response.status_code == 200:
notify_record.update(status="SUCCESS")
return
except Exception as e:
notify_record.add_log(f"第{attempt}次失败:{e}")
if attempt < self.MAX_RETRIES:
delay = self.RETRY_DELAYS[attempt - 1]
jitter = random.uniform(0, delay * 0.2)
time.sleep(delay + jitter)
notify_record.update(status="FAILED")
alert_service.send(f"回调通知失败:{notify_record.order_id}")
📊 重试策略:1s → 5s → 30s → 2min → 10min → 30min,总计约 45 分钟
⚡ 5. 高并发性能优化
⚠️ 问题描述
大促期间 QPS 暴增 10 倍,如何保证系统不崩溃、响应不超时?
| 优化手段 |
具体实现 |
效果 |
| 多级缓存 | 本地缓存 (Caffeine) + Redis | 热点数据访问 < 1ms |
| 异步处理 | 非核心逻辑异步化(通知、记账) | 主链路 RT 降低 60% |
| 限流熔断 | 令牌桶限流 + 自适应熔断 | 防止雪崩,保护系统 |
| 数据库优化 | 读写分离 + 分库分表 | 支持 10 万 + TPS |
| 连接池优化 | HikariCP + Redis 连接池 | 减少连接创建开销 |
| 批量处理 | 批量查询/写入,减少 IO 次数 | 吞吐量提升 3 倍 |
class CacheService:
def get_merchant_config(self, merchant_id):
config = local_cache.get(f"merchant:{merchant_id}")
if config:
return config
config = redis.get(f"merchant:{merchant_id}")
if config:
local_cache.set(f"merchant:{merchant_id}", config, ttl=60)
return config
config = db.query("SELECT * FROM merchant_config WHERE merchant_id = ?", merchant_id)
if config:
redis.set(f"merchant:{merchant_id}", config, ex=300)
local_cache.set(f"merchant:{merchant_id}", config, ttl=60)
return config
class RateLimiter:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.now()
def try_acquire(self):
now = time.now()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
💡 核心:80% 流量走缓存,20% 核心逻辑走数据库,异步处理非核心业务