import json import threading import time from django.core.cache import cache from django_redis import get_redis_connectioncnn = get_redis_connection()def acquire_lock(lock_key='lock_key', acquire_timeout=30, lock_timeout=30):"""尝试获取锁:param lock_key::param acquire_timeout::param lock_timeout::return:"""end = time.time() + acquire_timeoutwhile time.time() < end:if cache.add(lock_key, 'locked', lock_timeout):return Truetime.sleep(0.1)return Falsedef release_lock(lock_key='lock_key'):"""释放锁:param lock_key: 锁的键:return:"""cache.delete(lock_key)cache_queue_key = 'queue_key'class TaskQueue:def get_index0(self):if acquire_lock():try:ret = cnn.lindex(cache_queue_key, 0)if ret:return json.loads(ret)return {}finally:release_lock()return Nonedef get(self):if acquire_lock():try:ret = cnn.lpop(cache_queue_key)if ret:return json.loads(ret)return retfinally:release_lock()return Nonedef put(self, item):if acquire_lock():try:return cnn.rpush(cache_queue_key, json.dumps(item))finally:release_lock()def get_queue_len(self):if acquire_lock():try:return cnn.llen(cache_queue_key)finally:release_lock()return 0task_queue = TaskQueue()def start_queue():print('start free queue')while True:item = task_queue.get()# 处理队列passtime.sleep(0.1)# 作为守护进程启动 def threading_start_queue():threading.Thread(target=start_queue, daemon=True).start()go_start_queue = threading_start_queue()