test_redishandle.py 3.25 KB
Newer Older
qunfeng qiu's avatar
qunfeng qiu committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# coding=utf8
"""
@Version: 1.0
@Python Version:3.6.6
@Author: wenhx8
@File: test_redishandle.py.py
@Time: 2019/6/19 019 13:04
@Description: 处理用户信息
"""

from unittest import TestCase, main

from cucc_common_pkg.redishanlde import RedisHandle
from cucc_common_pkg.globalconst import RedisConnConst, RedisSentinelConnConst, RedisConst
#import simplejson as json
import simplejson as json
import re


class TestUtility(TestCase):
    r'''
    TestUtility
    '''


    def test_redis_get_vals(self):
        r"""
        测试获取redis值
        :return:
        """


        dict_conn_master = {
            RedisConnConst.CONN_IP: "10.245.47.30"
            , RedisConnConst.CONN_PORT: "32379"
            , RedisConnConst.CONN_AUTH: "!@#rjyjy"
            , RedisConnConst.CONN_DB: "15"
            #, RedisConst.EXPIRE_TIME: "30"
        }

        dict_conn_slave = {
            RedisConnConst.CONN_IP: "10.245.47.30"
            , RedisConnConst.CONN_PORT: "32380"
            , RedisConnConst.CONN_AUTH: "!@#rjyjy"
            , RedisConnConst.CONN_DB: "9"
            #, RedisConst.EXPIRE_TIME: "30"
        }

        dict_set = {
            "test_get_vals_k_1": '温',
            "test_get_vals_k_2": 'asd阿萨德'
        }

        dk = {
            "f243e18ce2c411e9835c0a58c0a80327":"f243e18ce2c411e9835c0a58c0a80327"

        }

        dict_key = {'test_get_vals_k_1': 'zxc',
                    'test_get_vals_k_2': 'zxc111'
                    }
        dict_key["asd"]=[dict_set]

        dict_key_set = {"zxczxczx": json.dumps(dict_key,ensure_ascii=False)
                    }

        # dict_key_set = {"zxczxczx":dict_key                                 }
        rh = RedisHandle()

        rh.set_vals(dict_key_set, dict_conn_master)

        dict_ret = rh.get_vals({"zxczxczx": "zxczxczx"}, dict_conn_master)

        asdasd=dict_ret["zxczxczx"]

        ret_json = {
            'RetCode': '1',
            'RetVal': '1'
        }

        ret_json["asd"]=json.loads(asdasd)

        dict_ret = rh.get_vals(dk, dict_conn_master)

        ret_json = {
            'RetCode': '1',
            'RetVal': '1'
        }
        list = []
        asd =  dict_ret["f243e18ce2c411e9835c0a58c0a80327"]
        print(isinstance(asd,str))
        print(isinstance(asd, dict))
        list.append(json.loads(asd))
        ret_json["Datarow"] = list

        redis_kv = { }
        redis_kv['f243e18ce2c411e9835c0a58c0a80327']=json.dumps(ret_json,ensure_ascii=False)


        rh.set_vals(redis_kv, dict_conn_master)
        dict_ret = rh.get_vals(dict_key, dict_conn_slave)

        for k, v in dict_set.items():
            self.assertEqual(str(v), str(dict_ret[dict_key[k]]))

        rh.del_key(dict_key, dict_conn_master)

        dict_ret = rh.get_vals(dict_key, dict_conn_slave)
        for k, v in dict_set.items():
            self.assertFalse(dict_key[k] in dict_ret)

        dict_set_incr = {
            "test_inrc_vals_k_1": -1,
            "test_inrc_vals_k_2": 2
        }

        rh.set_vals(dict_set_incr, dict_conn_master)
        dict_ret = rh.set_key_incr(dict_set_incr, dict_conn_master)
        for k, v in dict_set_incr.items():
            self.assertEqual(int(v)+1, int(dict_ret[k]))


if __name__ == '__main__':
    main()