
django-patterns
PopularDjango架构模式、使用DRF的REST API设计、ORM最佳实践、缓存、信号、中间件以及生产级Django应用程序。
40Kstars
5Kforks
Updated 2/5/2026
SKILL.md
readonlyread-only
name
django-patterns
description
Django架构模式、使用DRF的REST API设计、ORM最佳实践、缓存、信号、中间件以及生产级Django应用程序。
Django 开发模式
适用于可扩展、可维护应用程序的生产级 Django 架构模式。
何时激活
- 构建 Django Web 应用程序时
- 设计 Django REST Framework API 时
- 使用 Django ORM 和模型时
- 设置 Django 项目结构时
- 实现缓存、信号、中间件时
项目结构
推荐布局
myproject/
├── config/
│ ├── __init__.py
│ ├── settings/
│ │ ├── __init__.py
│ │ ├── base.py # Base settings
│ │ ├── development.py # Dev settings
│ │ ├── production.py # Production settings
│ │ └── test.py # Test settings
│ ├── urls.py
│ ├── wsgi.py
│ └── asgi.py
├── manage.py
└── apps/
├── __init__.py
├── users/
│ ├── __init__.py
│ ├── models.py
│ ├── views.py
│ ├── serializers.py
│ ├── urls.py
│ ├── permissions.py
│ ├── filters.py
│ ├── services.py
│ └── tests/
└── products/
└── ...
拆分设置模式
# config/settings/base.py
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent.parent
SECRET_KEY = env('DJANGO_SECRET_KEY')
DEBUG = False
ALLOWED_HOSTS = []
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'rest_framework.authtoken',
'corsheaders',
# Local apps
'apps.users',
'apps.products',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'whitenoise.middleware.WhiteNoiseMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'config.urls'
WSGI_APPLICATION = 'config.wsgi.application'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': env('DB_NAME'),
'USER': env('DB_USER'),
'PASSWORD': env('DB_PASSWORD'),
'HOST': env('DB_HOST'),
'PORT': env('DB_PORT', default='5432'),
}
}
# config/settings/development.py
from .base import *
DEBUG = True
ALLOWED_HOSTS = ['localhost', '127.0.0.1']
DATABASES['default']['NAME'] = 'myproject_dev'
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
# config/settings/production.py
from .base import *
DEBUG = False
ALLOWED_HOSTS = env.list('ALLOWED_HOSTS')
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SECURE_HSTS_SECONDS = 31536000
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# Logging
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'WARNING',
'class': 'logging.FileHandler',
'filename': '/var/log/django/django.log',
},
},
'loggers': {
'django': {
'handlers': ['file'],
'level': 'WARNING',
'propagate': True,
},
},
}
模型设计模式
模型最佳实践
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.core.validators import MinValueValidator, MaxValueValidator
class User(AbstractUser):
"""Custom user model extending AbstractUser."""
email = models.EmailField(unique=True)
phone = models.CharField(max_length=20, blank=True)
birth_date = models.DateField(null=True, blank=True)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class Meta:
db_table = 'users'
verbose_name = 'user'
verbose_name_plural = 'users'
ordering = ['-date_joined']
def __str__(self):
return self.email
def get_full_name(self):
return f"{self.first_name} {self.last_name}".strip()
class Product(models.Model):
"""Product model with proper field configuration."""
name = models.CharField(max_length=200)
slug = models.SlugField(unique=True, max_length=250)
description = models.TextField(blank=True)
price = models.DecimalField(
max_digits=10,
decimal_places=2,
validators=[MinValueValidator(0)]
)
stock = models.PositiveIntegerField(default=0)
is_active = models.BooleanField(default=True)
category = models.ForeignKey(
'Category',
on_delete=models.CASCADE,
related_name='products'
)
tags = models.ManyToManyField('Tag', blank=True, related_name='products')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'products'
ordering = ['-created_at']
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['-created_at']),
models.Index(fields=['category', 'is_active']),
]
constraints = [
models.CheckConstraint(
check=models.Q(price__gte=0),
name='price_non_negative'
)
]
def __str__(self):
return self.name
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.name)
super().save(*args, **kwargs)
QuerySet 最佳实践
from django.db import models
class ProductQuerySet(models.QuerySet):
"""Custom QuerySet for Product model."""
def active(self):
"""Return only active products."""
return self.filter(is_active=True)
def with_category(self):
"""Select related category to avoid N+1 queries."""
return self.select_related('category')
def with_tags(self):
"""Prefetch tags for many-to-many relationship."""
return self.prefetch_related('tags')
def in_stock(self):
"""Return products with stock > 0."""
return self.filter(stock__gt=0)
def search(self, query):
"""Search products by name or description."""
return self.filter(
models.Q(name__icontains=query) |
models.Q(description__icontains=query)
)
class Product(models.Model):
# ... fields ...
objects = ProductQuerySet.as_manager() # Use custom QuerySet
# Usage
Product.objects.active().with_category().in_stock()
管理器方法
class ProductManager(models.Manager):
"""Custom manager for complex queries."""
def get_or_none(self, **kwargs):
"""Return object or None instead of DoesNotExist."""
try:
return self.get(**kwargs)
except self.model.DoesNotExist:
return None
def create_with_tags(self, name, price, tag_names):
"""Create product with associated tags."""
product = self.create(name=name, price=price)
tags = [Tag.objects.get_or_create(name=name)[0] for name in tag_names]
product.tags.set(tags)
return product
def bulk_update_stock(self, product_ids, quantity):
"""Bulk update stock for multiple products."""
return self.filter(id__in=product_ids).update(stock=quantity)
# In model
class Product(models.Model):
# ... fields ...
custom = ProductManager()
Django REST Framework 模式
序列化器模式
from rest_framework import serializers
from django.contrib.auth.password_validation import validate_password
from .models import Product, User
class ProductSerializer(serializers.ModelSerializer):
"""Serializer for Product model."""
category_name = serializers.CharField(source='category.name', read_only=True)
average_rating = serializers.FloatField(read_only=True)
discount_price = serializers.SerializerMethodField()
class Meta:
model = Product
fields = [
'id', 'name', 'slug', 'description', 'price',
'discount_price', 'stock', 'category_name',
'average_rating', 'created_at'
]
read_only_fields = ['id', 'slug', 'created_at']
def get_discount_price(self, obj):
"""Calculate discount price if applicable."""
if hasattr(obj, 'discount') and obj.discount:
return obj.price * (1 - obj.discount.percent / 100)
return obj.price
def validate_price(self, value):
"""Ensure price is non-negative."""
if value < 0:
raise serializers.ValidationError("Price cannot be negative.")
return value
class ProductCreateSerializer(serializers.ModelSerializer):
"""Serializer for creating products."""
class Meta:
model = Product
fields = ['name', 'description', 'price', 'stock', 'category']
def validate(self, data):
"""Custom validation for multiple fields."""
if data['price'] > 10000 and data['stock'] > 100:
raise serializers.ValidationError(
"Cannot have high-value products with large stock."
)
return data
class UserRegistrationSerializer(serializers.ModelSerializer):
"""Serializer for user registration."""
password = serializers.CharField(
write_only=True,
required=True,
validators=[validate_password],
style={'input_type': 'password'}
)
password_confirm = serializers.CharField(write_only=True, style={'input_type': 'password'})
class Meta:
model = User
fields = ['email', 'username', 'password', 'password_confirm']
def validate(self, data):
"""Validate passwords match."""
if data['password'] != data['password_confirm']:
raise serializers.ValidationError({
"password_confirm": "Password fields didn't match."
})
return data
def create(self, validated_data):
"""Create user with hashed password."""
validated_data.pop('password_confirm')
password = validated_data.pop('password')
user = User.objects.create(**validated_data)
user.set_password(password)
user.save()
return user
ViewSet 模式
from rest_framework import viewsets, status, filters
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from django_filters.rest_framework import DjangoFilterBackend
from .models import Product
from .serializers import ProductSerializer, ProductCreateSerializer
from .permissions import IsOwnerOrReadOnly
from .filters import ProductFilter
from .services import ProductService
class ProductViewSet(viewsets.ModelViewSet):
"""ViewSet for Product model."""
queryset = Product.objects.select_related('category').prefetch_related('tags')
permission_classes = [IsAuthenticated, IsOwnerOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_class = ProductFilter
search_fields = ['name', 'description']
ordering_fields = ['price', 'created_at', 'name']
ordering = ['-created_at']
def get_serializer_class(self):
"""Return appropriate serializer based on action."""
if self.action == 'create':
return ProductCreateSerializer
return ProductSerializer
def perform_create(self, serializer):
"""Save with user context."""
serializer.save(created_by=self.request.user)
@action(detail=False, methods=['get'])
def featured(self, request):
"""Return featured products."""
featured = self.queryset.filter(is_featured=True)[:10]
serializer = self.get_serializer(featured, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def purchase(self, request, pk=None):
"""Purchase a product."""
product = self.get_object()
service = ProductService()
result = service.purchase(product, request.user)
return Response(result, status=status.HTTP_201_CREATED)
@action(detail=False, methods=['get'], permission_classes=[IsAuthenticated])
def my_products(self, request):
"""Return products created by current user."""
products = self.queryset.filter(created_by=request.user)
page = self.paginate_queryset(products)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
自定义操作
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def add_to_cart(request):
"""Add product to user cart."""
product_id = request.data.get('product_id')
quantity = request.data.get('quantity', 1)
try:
product = Product.objects.get(id=product_id)
except Product.DoesNotExist:
return Response(
{'error': 'Product not found'},
status=status.HTTP_404_NOT_FOUND
)
cart, _ = Cart.objects.get_or_create(user=request.user)
CartItem.objects.create(
cart=cart,
product=product,
quantity=quantity
)
return Response({'message': 'Added to cart'}, status=status.HTTP_201_CREATED)
服务层模式
# apps/orders/services.py
from typing import Optional
from django.db import transaction
from .models import Order, OrderItem
class OrderService:
"""Service layer for order-related business logic."""
@staticmethod
@transaction.atomic
def create_order(user, cart: Cart) -> Order:
"""Create order from cart."""
order = Order.objects.create(
user=user,
total_price=cart.total_price
)
for item in cart.items.all():
OrderItem.objects.create(
order=order,
product=item.product,
quantity=item.quantity,
price=item.product.price
)
# Clear cart
cart.items.all().delete()
return order
@staticmethod
def process_payment(order: Order, payment_data: dict) -> bool:
"""Process payment for order."""
# Integration with payment gateway
payment = PaymentGateway.charge(
amount=order.total_price,
token=payment_data['token']
)
if payment.success:
order.status = Order.Status.PAID
order.save()
# Send confirmation email
OrderService.send_confirmation_email(order)
return True
return False
@staticmethod
def send_confirmation_email(order: Order):
"""Send order confirmation email."""
# Email sending logic
pass
缓存策略
视图级缓存
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
@method_decorator(cache_page(60 * 15), name='dispatch') # 15 minutes
class ProductListView(generic.ListView):
model = Product
template_name = 'products/list.html'
context_object_name = 'products'
模板片段缓存
{% load cache %}
{% cache 500 sidebar %}
... expensive sidebar content ...
{% endcache %}
低级缓存
from django.core.cache import cache
def get_featured_products():
"""Get featured products with caching."""
cache_key = 'featured_products'
products = cache.get(cache_key)
if products is None:
products = list(Product.objects.filter(is_featured=True))
cache.set(cache_key, products, timeout=60 * 15) # 15 minutes
return products
QuerySet 缓存
from django.core.cache import cache
def get_popular_categories():
cache_key = 'popular_categories'
categories = cache.get(cache_key)
if categories is None:
categories = list(Category.objects.annotate(
product_count=Count('products')
).filter(product_count__gt=10).order_by('-product_count')[:20])
cache.set(cache_key, categories, timeout=60 * 60) # 1 hour
return categories
信号
信号模式
# apps/users/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .models import Profile
User = get_user_model()
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""Create profile when user is created."""
if created:
Profile.objects.create(user=instance)
@receiver(post_save, sender=User)
def save_user_profile(sender, instance, **kwargs):
"""Save profile when user is saved."""
instance.profile.save()
# apps/users/apps.py
from django.apps import AppConfig
class UsersConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.users'
def ready(self):
"""Import signals when app is ready."""
import apps.users.signals
中间件
自定义中间件
# middleware/active_user_middleware.py
import time
from django.utils.deprecation import MiddlewareMixin
class ActiveUserMiddleware(MiddlewareMixin):
"""Middleware to track active users."""
def process_request(self, request):
"""Process incoming request."""
if request.user.is_authenticated:
# Update last active time
request.user.last_active = timezone.now()
request.user.save(update_fields=['last_active'])
class RequestLoggingMiddleware(MiddlewareMixin):
"""Middleware for logging requests."""
def process_request(self, request):
"""Log request start time."""
request.start_time = time.time()
def process_response(self, request, response):
"""Log request duration."""
if hasattr(request, 'start_time'):
duration = time.time() - request.start_time
logger.info(f'{request.method} {request.path} - {response.status_code} - {duration:.3f}s')
return response
性能优化
N+1 查询预防
# Bad - N+1 queries
products = Product.objects.all()
for product in products:
print(product.category.name) # Separate query for each product
# Good - Single query with select_related
products = Product.objects.select_related('category').all()
for product in products:
print(product.category.name)
# Good - Prefetch for many-to-many
products = Product.objects.prefetch_related('tags').all()
for product in products:
for tag in product.tags.all():
print(tag.name)
数据库索引
class Product(models.Model):
name = models.CharField(max_length=200, db_index=True)
slug = models.SlugField(unique=True)
category = models.ForeignKey('Category', on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=['name']),
models.Index(fields=['-created_at']),
models.Index(fields=['category', 'created_at']),
]
批量操作
# Bulk create
Product.objects.bulk_create([
Product(name=f'Product {i}', price=10.00)
for i in range(1000)
])
# Bulk update
products = Product.objects.all()[:100]
for product in products:
product.is_active = True
Product.objects.bulk_update(products, ['is_active'])
# Bulk delete
Product.objects.filter(stock=0).delete()
快速参考
| 模式 | 描述 |
|---|---|
| 拆分设置 | 分离开发/生产/测试设置 |
| 自定义 QuerySet | 可重用的查询方法 |
| 服务层 | 业务逻辑分离 |
| ViewSet | REST API 端点 |
| 序列化器验证 | 请求/响应转换 |
| select_related | 外键优化 |
| prefetch_related | 多对多优化 |
| 缓存优先 | 缓存昂贵操作 |
| 信号 | 事件驱动操作 |
| 中间件 | 请求/响应处理 |
请记住:Django 提供了许多快捷方式,但对于生产应用程序来说,结构和组织比简洁的代码更重要。为可维护性而构建。
You Might Also Like
Related Skills

verify
243K
Use when you want to validate changes before committing, or when you need to check all React contribution requirements.
facebook
test
243K
Use when you need to run tests for React core. Supports source, www, stable, and experimental channels.
facebook
feature-flags
243K
Use when feature flag tests fail, flags need updating, understanding @gate pragmas, debugging channel-specific test failures, or adding new flags to React.
facebook
extract-errors
243K
Use when adding new error messages to React, or seeing "unknown error code" warnings.
facebook