Python使用Redis,Redis连接池的用法
直接上代码:
import redis, traceback, sys
from loguru import logger
class RedisClient:
__instance = None
def __new__(cls, *args, **kwargs):
if not cls.__instance:
return object.__new__(cls)
return cls.__instance
def __init__(self, host, port, password=None):
try:
# 拿到一个Redis实例的连接池,避免每次建立、释放连接的开销,节省了每次连接用的时间
self.POLL = redis.ConnectionPool(host=host,
port=port,
decode_responses=True,
db=0,
password=password,
max_connections=100)
logger.info(f'获取Redis连接池, Host={host}, Port={port}')
except Exception as e:
logger.error(f'获取Redis连接池异常, 程序退出:{str(e)},traceback={traceback.format_exc()}')
sys.exit(0)
def get_redis_client(self):
try:
# 从连接池中获取一个连接实例
redis_conn = redis.StrictRedis(connection_pool=self.POLL)
if redis_conn.ping():
logger.info(f'获取Redis连接实例成功')
return redis_conn
except Exception as e:
logger.error(f'Redis连接异常:{str(e)},traceback={traceback.format_exc()}')
上述是封装了一个RedisClient的类,使用了
redis.ConnectionPool 连接池,
这样能做到,大概的意思即:拿到一个Redis实例的连接池,避免每次建立、释放连接的开销,节省了每次连接用的时间,文中设置了最大100个。在获取实例做连接时,使用了
redis.StrictRedis(connection_pool=self.POLL)
文中使用了loguru记录日志。
在实际使用时,如下:
from tools.redisClient import RedisClient
import sqlite3, json
from config import Redis_Host, Redis_Port, Redis_pwd
redis_obj = RedisClient(Redis_Host, Redis_Port, Redis_pwd)
# 保存特征值到Redis指定的name、key中
def SaveFeatsToRedis(name, key, data):
redis_connect = redis_obj.get_redis_client()
redis_connect.hset(
name=name,
key=key,
value=json.dumps(data, ensure_ascii=False)
)
if __name__ == '__main__':
data, count = GetFeatsFromSqlite(100)
SaveFeatsToRedis(data)
redis_connect = redis_obj.get_redis_client()
print('查询Redis中所有的hkey', redis_connect.keys())
导入RedisClient,调用此类中的
get_redis_client() 方法,然后就是调用get、set即可。