【总结】自动化测试-工具1-一键删除

  • 2022-01-18
  • Admin

delete_micro_service.py

import os

try:
    import requests
except:
    os.system('pip3 install requests')
try:
    import json
except:
    os.system('pip3 install json')


def delete_micro_service(swagger_base: str, service_name: str, version: str):
    # 'http://10.5.6.55:80/api/msdiscover/v1/services/zte-fin-revenue-billing/version/v6396001002'
    url = '{0}/{1}/version/{2}'.format(swagger_base, service_name, version)
    print('url:', url)
    resp = requests.delete(url)
    print(resp)
    if '204' in str(resp):
        print('success!')


def delete_micro_services(swagger_base: str, service_list: list, version_list: list):
    for service_name in service_list:
        for version in version_list:
            delete_micro_service(swagger_base, service_name, version)
    return None


def query_micro_service(swagger_base: str, service_name: str, version='v1'):
    if not version:
        url = '{0}/{1}'.format(swagger_base, service_name)
    else:
        url = '{0}/{1}/version/{2}'.format(swagger_base, service_name, version)
    print('url:', url)
    try:
        resp = requests.get(url).json()
    except:
        resp = None
    print(resp)
    return resp


def delete_micro_service(swagger_base: str, service_name: str, version: str, ip: str, port: str):
    # 'http://10.5.6.55:80/api/msdiscover/v1/services/zte-fin-revenue-billing/version/v1/nodes/172.17.0.2/8080'
    url = '{0}/{1}/version/{2}/nodes/{3}/{4}'.format(swagger_base, service_name, version, ip, port)
    print('url:', url, 'method:', 'delete')
    resp = requests.delete(url)
    print(resp)
    if '204' in str(resp):
        print('success!')
    return None


def delete_micro_service_exclude_v1(swagger_base='http://10.5.6.55:80/api/msdiscover/v1/services',
                                    service_name='zte-fin-revenue-billing'):
    service_list = query_micro_service(swagger_base, service_name, None)
    for service in service_list:
        version = service.get('version')
        if 'v1' != str(version):
            for node in service.get('nodes'):
                ip = node.get('ip')
                port = node.get('port')
                delete_micro_service(swagger_base, service_name, version, ip, port)
    return None


def delete_micro_service_v1(swagger_base='http://10.5.6.55:80/api/msdiscover/v1/services',
                            service_name='zte-fin-revenue-billing', exclude_ip=['10.5.17.145']):
    service = query_micro_service(swagger_base, service_name, 'v1')
    if not service:
        return
    version = service.get('version')
    if 'v1' == str(version):
        for node in service.get('nodes'):
            ip = node.get('ip')
            port = node.get('port')
            if ip not in exclude_ip:
                delete_micro_service(swagger_base, service_name, version, ip, port)


def query_micro_services(swagger_base='http://10.5.6.55:80/api/msdiscover/v1/services', service_list=[], version='v1'):
    result = dict()
    if service_list:
        for service_name in service_list:
            service_info = query_micro_service(swagger_base, service_name, version)
            service_versions = dict()
            if service_info and isinstance(service_info, dict):
                service_info = [service_info]
            if service_info and isinstance(service_info, list):
                for service in service_info:
                    version = service.get('version')
                    nodes = service.get('nodes')
                    service_nodes = []
                    if isinstance(nodes, list) and nodes:
                        for node in nodes:
                            ip = node.get('ip')
                            port = node.get('port')
                            service_nodes.append({'ip': ip, 'port': port})
                    service_versions.update({version: service_nodes})
            else:
                pass
            result.update({service_name: service_versions})
        # print(result)
        print(json.dumps(result, ensure_ascii=False, indent=4))


if __name__ == '__main__':
    # delete_micro_service('http://10.5.6.55:80/api/msdiscover/v1/services', 'zte-fin-revenue-billing', 'v6396001051')
    swagger_base = 'http://10.5.6.55:80/api/msdiscover/v1/services'
    service_list = ['zte-fin-revenue-billing', 'zte-fin-revenue-public', 'zte-fin-revenue-portal',
                    'zte-fin-revenue-erp', 'zte-fin-revenue-cmsold', 'zte-bmt-rbc-sync', 'zte-bmt-rbc-util'
        , 'zte-bmt-rbc-collectionclient', 'zte-bmt-rbc-collectiongateway', 'zte-bmt-rbc-collectionmodel',
                    'zte-bmt-rbc-collectionverification'
        , 'zte-fin-revenue-public-junit', 'zte-fin-revenue-cmsold-junit', 'zte-bmt-rbc-sync_junit',
                    'zte-bmt-rbc-util_junit', 'zte-bmt-rbc-collectionverification_junit',
                    'zte-bmt-rbc-collectionmodel_junit', 'zte-bmt-rbc-collectionclient_junit',
                    'zte-bmt-rbc-collectionex_junit', 'zte-bmt-rbc-collectioncommon', 'zte-bmt-rbc-collectionex']
    for service_name in service_list:
        delete_micro_service_exclude_v1(swagger_base, service_name)
    for service_name in service_list:
        delete_micro_service_v1(swagger_base, service_name, ['10.5.17.145', '10.5.18.159'])
    # query_micro_services(swagger_base, service_list, input('version:'))
 

原文:https://blog.csdn.net/YCYloveAndy/article/details/122561673

联系站长

QQ:769220720