Files
peko-java/python/peko_user_in_room_current.py
2024-01-03 15:22:16 +08:00

103 lines
3.3 KiB
Python

import redis
class RedisUtil:
def __init__(self, host, port, db, password):
self.host = host
self.port = port
self.db = db
self.password = password
def scan(self, match: str):
keys = []
res = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
try:
cursor = 0
while True:
tup = res.scan(cursor=cursor, match=match, count=1000)
print(tup)
if isinstance(tup[0], int):
cursor = tup[0]
if isinstance(tup[1], list):
keys.extend(tup[1])
if cursor == 0:
break
except Exception as e:
print('scan is error.', e)
finally:
res.close()
return keys
def delete(self, name: str):
res = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
try:
res.delete(name)
except Exception as e:
print('delete ', name, ' is error.', e)
finally:
res.close()
def hgetall(self, name: str):
data = None
res = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
try:
data = res.hgetall(name)
except Exception as e:
print('hget ', name, ' is error.', e)
finally:
res.close()
return data
def hincrby(self, name: str, field, increment):
res = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
try:
res.hincrby(name, field, increment)
except Exception as e:
print('hincrby ', name, ' is error.', e)
finally:
res.close()
def hdel(self, name: str, field):
res = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
try:
res.hdel(name, field)
except Exception as e:
print('hdel ', name, ' is error.', e)
finally:
res.close()
def hget(self, name: str, field):
data = None
res = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
try:
data = res.hget(name, field)
except Exception as e:
print('hdel ', name, ' is error.', e)
finally:
res.close()
return data
def zrange(self, name: str, start: int, end: int, withscores: bool):
data = None
res = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
try:
data = res.zrange(name, start=start, end=end, withscores=withscores)
except Exception as e:
print('hdel ', name, ' is error.', e)
finally:
res.close()
return data
if __name__ == '__main__':
# yinmeng
# util = RedisUtil(host='172.16.0.2', port=6379, db=0, password='mehpec-vybgig-Demri6')
# piko
util = RedisUtil(host='172.22.0.8', port=6379, db=0, password='pc8DphhaXwTe2jyv')
members = util.hgetall('peko_user_in_room_current')
for member in members:
key = str(member.decode('utf-8'))
if not (key.startswith('[') and key.endswith(']')):
print(key)
util.hdel('peko_user_in_room_current', key)