test_redis_sentinel_handle.py 2 KB
Newer Older
qunfeng qiu's avatar
qunfeng qiu committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# coding=utf8
"""
@Version: 1.0
@Python Version:3.6.6
@Author: wenhx8
@File: test_redishandle.py.py
@Time: 2019/6/19 019 13:04
@Description: 处理用户信息
"""

from unittest import TestCase, main

from cucc_common_pkg.redishanlde import RedisHandle
from cucc_common_pkg.globalconst import RedisConnConst, RedisSentinelConnConst, RedisConst
import re


class TestUtility(TestCase):
    r'''
    TestUtility
    '''

    def test_sentinel_get_vals(self):
        r"""
        测试获取redis值
        :return: 
        """

        dict_conn = {
            RedisSentinelConnConst.CONN_HOSTS: [('10.245.47.27', '26379'), ('10.245.47.28', '26379'),
                                                ('10.245.47.29', '26379')]
            , RedisSentinelConnConst.CONN_CLUSTERNAME: "proj_dev_master"
            , RedisConnConst.CONN_AUTH: "!@#rjyjy"
            , RedisConnConst.CONN_DB: "15"
            # , RedisConst.EXPIRE_TIME: "30"
        }

        dict_set = {
            "test_get_vals_k_1": '温',
            "test_get_vals_k_2": 'asd阿萨德'
        }

        dict_key = {'test_get_vals_k_1': 'zxc',
                    'test_get_vals_k_2': 'zxc111'
                    }
        rh = RedisHandle()

        rh.set_vals(dict_set, dict_conn, True)
        dict_ret = rh.get_vals(dict_key, dict_conn,True)

        for k, v in dict_set.items():
            self.assertEqual(str(v), str(dict_ret[dict_key[k]]))

        rh.del_key(dict_key, dict_conn, True)

        dict_ret = rh.get_vals(dict_key, dict_conn, True)
        for k, v in dict_set.items():
            self.assertFalse(dict_key[k] in dict_ret)

        dict_set_incr = {
            "test_inrc_vals_k_1": -1,
            "test_inrc_vals_k_2": 2
        }

        rh.set_vals(dict_set_incr, dict_conn, True)
        dict_ret = rh.set_key_incr(dict_set_incr, dict_conn,True)
        for k, v in dict_set_incr.items():
            self.assertEqual(int(v) + 1, int(dict_ret[k]))



if __name__ == '__main__':
    main()