| 12345678910111213141516171819202122232425262728293031323334 |
- import json
- import redis
- from pydantic import BaseModel
- import random
- class RedisRecoveryData(BaseModel):
- code: int
- RECOVERY_REDIS_KEY = "pin_recovery_{user_id}"
- def generate_recovery_key(user_id:int):
- pin_code = str(random.randint(100000, 999999))
- redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
- redis_client.set(RECOVERY_REDIS_KEY.format(user_id=user_id), json.dumps({"code": pin_code}))
- redis_client.expire(RECOVERY_REDIS_KEY.format(user_id=user_id), 900) # Expire in 15 minutes
- return pin_code
- def add_token(user_id, token):
- redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
- redis_client.delete(RECOVERY_REDIS_KEY.format(user_id=user_id))
- redis_client.set(RECOVERY_REDIS_KEY.format(user_id=user_id), json.dumps({"token": token}))
- def get_token(user_id):
- redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
- data = redis_client.get(RECOVERY_REDIS_KEY.format(user_id=user_id))
- json_data = json.loads(str(data)) if data else None
- return json_data.get("token") if json_data else None
- def get_recovery_data(user_id: int) -> RedisRecoveryData:
- redis_client = redis.Redis(host='localhost', port=6379, db=0,decode_responses=True)
- data = redis_client.get(RECOVERY_REDIS_KEY.format(user_id=user_id))
- json_data = json.loads(str(data)) if data else None
- if json_data:
- return RedisRecoveryData(**json_data)
- return RedisRecoveryData(code=-1)
|