我的编程空间,编程开发者的网络收藏夹
学习永远不晚

python grpc 应用

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

python grpc 应用

1、rpc介绍

2、grpc

3、基于grpc协议文件传输

4、基于grpc协议jmeter压测获取实时结果

5、基于grcp协议获取jmeter最终压测报告,并将报告保存至client端

6、压测中途停止jmeter

grpc server (jmeter server)  192.168.18.128
rpc client (本机)

1、rpc协议介绍

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。 -- 摘自 百度百科

2、grpc介绍

gRPC 是一款高性能、开源的 RPC 框架,产自 Google,基于 ProtoBuf 序列化协议进行开发,支持多种语言(Golang、Python、Java等),本篇只介绍 Python 的 gRPC 使用。因为 gRPC 对 HTTP/2 协议的支持使其在 Android、IOS 等客户端后端服务的开发领域具有良好的前景。gRPC 提供了一种简单的方法来定义服务,同时客户端可以充分利用 HTTP2 stream 的特性,从而有助于节省带宽、降低 TCP 的连接次数、节省CPU的使用等。

3、基于grpc协议文件传输

目标:将本地的文件(test.file)传输至rpc server (192.168.18.128)

python 版本 2.7.13

3.1、

gRPC 的安装:

[root@vm6 rpc]# pip2.7 install requests
[root@vm6 rpc]# pip2.7  install grpcio

安装 ProtoBuf 相关的 python 依赖库:

[root@vm6 rpc]# pip2.7 install protobuf

安装 python grpc 的 protobuf 编译工具:

[root@vm6 rpc]# pip2.7 install grpcio-tools

定义 gRPC 接口:

[root@vm6 ~]# mkdir rpc rpc/__init__.py
[root@vm6 ~]# cd rpc/
[root@vm6 rpc]# vim rpc.proto 
// grpc 版本
syntax = "proto3";   
package rpc;

//定义接口
service RPC {
//相当于定义接口方法 
    rpc sendConfFile(Content) returns (Status) {}
}

//相当于定义类属性,此属性用于接受文本
message  Content {
  string  text = 1;
}
//此属性用于return 结果状态吗
message Status {
    int64 code = 1;
}

编译 protobuf:

[root@vm6 rpc]# python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./rpc.proto

server 端代码:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os

import grpc
import rpc_pb2
import rpc_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
#server端文件保存的位置
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')

class Performance(rpc_pb2_grpc.RPCServicer):

    def sendConfFile(self, content,context):
        ''' 保存配置文件,如config.jmx '''
        text = content.text
        try:
            print jmeter_config
            conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8')
            conf_handle.write(text)
            return rpc_pb2.Status(code=0)
        except Exception,e:
            print e
            return rpc_pb2.Status(code=1)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

client 端代码:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import logging
import json
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc.log',
    filemode='a+')

rpc_server = r'192.168.18.128'
rpc_port = '50051'

class performance():

  '''性能测试的客户端接口'''

  def __init__(self,ip,port):

    '''初始化,连接RPC服务'''

    logging.info("performance_client init")
    conn = grpc.insecure_channel(ip + ':' + port)
    self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn)

  def sendConfig(self,filename):
      file_handle = codecs.open(filename, 'r', encoding='utf-8')
      content = file_handle.read()
      '''向RPC server发送测试的配置文件'''
      response =  self.stub_client.sendConfFile(rpc_pb2.Content(text=content))
      print response.code

if __name__ == '__main__':
    client = performance(rpc_server,rpc_port)
    ##将本地的rpc_client.py文件当做测试文件发送到server端
    client.sendConfig('rpc_client.py')

启动server端:

[root@vm6 rpc]# python rpc_server.py

启动client端:

D:\xisuo\rpc>python rpc_client.py
0

查看server端的文件:

[root@vm6 rpc]# more conf/config.jmx 
# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import logging
。。。略

4、基于grpc协议jmeter压测获取实时结果

4.1、server端部署jdk,jmeter

将jdk解压到/usr/local/ 配置环境变量

将jmeter解压到/usr/local

4.2、因为我们是有jmeter的no gui模式在Linux执行,故需要jmeter的jmx文件,我们可以在本地使用gui模式先生成jmx文件

最终结果文件为 test.jmx,内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="2.4" jmeter="2.9 r1437961">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="测试计划" enabled="true">
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="测试" enabled="true">
        <stringProp name="TestPlan.comments">测试</stringProp>
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <stringProp name="LoopController.loops">50</stringProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">2</stringProp>
        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
        <longProp name="ThreadGroup.start_time">1496464278000</longProp>
        <longProp name="ThreadGroup.end_time">1496464278000</longProp>
        <boolProp name="ThreadGroup.scheduler">false</boolProp>
        <stringProp name="ThreadGroup.duration"></stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
      </ThreadGroup>
      <hashTree>
        <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="HTTP请求" enabled="true">
          <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
            <collectionProp name="Arguments.arguments"/>
          </elementProp>
          <stringProp name="HTTPSampler.domain">lansgg.blog.51cto.com</stringProp>
          <stringProp name="HTTPSampler.port">80</stringProp>
          <stringProp name="HTTPSampler.connect_timeout"></stringProp>
          <stringProp name="HTTPSampler.response_timeout"></stringProp>
          <stringProp name="HTTPSampler.protocol"></stringProp>
          <stringProp name="HTTPSampler.contentEncoding"></stringProp>
          <stringProp name="HTTPSampler.path"></stringProp>
          <stringProp name="HTTPSampler.method">GET</stringProp>
          <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
          <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
          <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
          <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
          <stringProp name="HTTPSampler.implementation">HttpClient4</stringProp>
          <boolProp name="HTTPSampler.monitor">false</boolProp>
          <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
          <stringProp name="TestPlan.comments">HTTP请求</stringProp>
        </HTTPSamplerProxy>
        <hashTree/>
        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="察看结果树" enabled="true">
          <boolProp name="ResultCollector.error_logging">false</boolProp>
          <objProp>
            <name>saveConfig</name>
            <value class="SampleSaveConfiguration">
              <time>true</time>
              <latency>true</latency>
              <timestamp>true</timestamp>
              <success>true</success>
              <label>true</label>
              <code>true</code>
              <message>true</message>
              <threadName>true</threadName>
              <dataType>true</dataType>
              <encoding>false</encoding>
              <assertions>true</assertions>
              <subresults>true</subresults>
              <responseData>false</responseData>
              <samplerData>false</samplerData>
              <xml>false</xml>
              <fieldNames>false</fieldNames>
              <responseHeaders>false</responseHeaders>
              <requestHeaders>false</requestHeaders>
              <responseDataOnError>false</responseDataOnError>
              <saveAssertionResultsFailureMessage>false</saveAssertionResultsFailureMessage>
              <assertionsResultsToSave>0</assertionsResultsToSave>
              <bytes>true</bytes>
            </value>
          </objProp>
          <stringProp name="TestPlan.comments">察看结果树</stringProp>
          <stringProp name="filename"></stringProp>
        </ResultCollector>
        <hashTree/>
      </hashTree>
    </hashTree>
  </hashTree>
</jmeterTestPlan>

当我们在linux终端上执行时,结果如下,

[root@vm6 rpc]# /usr/local/apache-jmeter-3.2/bin/jmeter -n -t test.jmx -l text.jtl

Creating summariser <summary>
Created the tree successfully using test.jmx
Starting the test @ Sat Jun 03 20:41:25 CST 2017 (1496493685813)
Waiting for possible Shutdown/StopTestNow/Heapdump message on port 4445
summary +     46 in 00:00:18 =    2.5/s Avg:  2745 Min:   423 Max:  3135 Err:     0 (0.00%) Active: 49 Started: 50 Finished: 1
summary +     54 in 00:00:08 =    7.2/s Avg:  2018 Min:   413 Max:  8000 Err:     0 (0.00%) Active: 0 Started: 50 Finished: 50
summary =    100 in 00:00:26 =    3.9/s Avg:  2353 Min:   413 Max:  8000 Err:     0 (0.00%)
Tidying up ...    @ Sat Jun 03 20:42:07 CST 2017 (1496493727553)
... end of run

而我们要将这些结果实时的在rpc client展示出来

接口文件:

syntax = "proto3";
package rpc;


service RPC {
// send config jmx
    rpc sendConfFile(Content) returns (Status) {}
//  run jmeter test
    rpc runJMeter(Content) returns (stream Content) {}
}

message  Content {
  string  text = 1;
}

message Status {
    int64 code = 1;
}

server端:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import json
import logging
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc_server.log',
    filemode='a+')

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')
jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter'
jmeter_result = os.path.join(os.getcwd(),r'result/result.jtl')

class Performance(rpc_pb2_grpc.RPCServicer):

    def sendConfFile(self, content,context):
        ''' 保存配置文件,如config.jmx '''
        text = content.text
        try:
            conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8')
            conf_handle.write(text)
            logging.info("sendConfFile Success!")
            return rpc_pb2.Status(code=0)
        except Exception,e:
            print e
            return rpc_pb2.Status(code=1)

    def runJMeter(self, content,context):
        logging.info("begin runJmeter.")
        '''执行测试任务,并将实时结果返回'''
        iplist = content.text
        if iplist:
            cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result + " -R "  +  iplist
        else:
            cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result
        logging.info(cmd)
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True,shell=True)
        for stdout_line in iter(popen.stdout.readline, ""):
            log_line = rpc_pb2.Content(text=stdout_line)
            yield log_line
        popen.stdout.close()
        return_code = popen.wait()
        if return_code:
            logging.warn(return_code)
            raise subprocess.CalledProcessError(return_code, cmd)



def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

client端:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import logging
import json
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc.log',
    filemode='a+')

rpc_server = r'192.168.18.128'
rpc_port = '50051'
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')
jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter'
jmeter_result = os.path.join(os.getcwd(),r'conf/result.jtl')

class performance():

  '''性能测试的客户端接口'''

  def __init__(self,ip,port):

    '''初始化,连接RPC服务'''

    logging.info("performance_client init")
    conn = grpc.insecure_channel(ip + ':' + port)
    self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn)

  def sendConfig(self,filename):
      file_handle = codecs.open(filename, 'r', encoding='utf-8')
      content = file_handle.read()
      '''向RPC server发送测试的配置文件'''
      response =  self.stub_client.sendConfFile(rpc_pb2.Content(text=content))
      return response.code


  def runJmeter(self,iplist):

      '''运行测试 返回一个生成器,内容为测试过程中的实时输出'''
      content = iplist
      logging.info("iplist %s" %content)
      for log in self.stub_client.runJMeter(rpc_pb2.Content(text=content)):
          yield log


if __name__ == '__main__':
    client = performance(rpc_server,rpc_port)
    code = client.sendConfig('test.jmx')
    iplist = r'10.1.1.1,10.1.1.2'
    for real_time_results in client.runJmeter(None):
        print "get realtime log from server : %s" % real_time_results.text

结果:

wKioL1kybpPh0wajAAA2IbnRzKk342.png-wh_50

5、基于grcp协议获取jmeter最终压测报告,并将报告保存至client端

6、压测中途停止jmeter

接口文件

syntax = "proto3";
package rpc;


service RPC {
// send config jmx
    rpc sendConfFile(Content) returns (Status) {}
//  run jmeter test
    rpc runJMeter(Content) returns (stream Content) {}
//  generateResult
    rpc generateResult(empty) returns (stream Content) {}
//  getResult
    rpc getResult(empty) returns (stream Content) {}
//  getResult
    rpc stopJMeter(empty) returns (stream Content) {}
}

message  Content {
  string  text = 1;
}

message Status {
    int64 code = 1;
}
message empty {

}

server端:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import json
import logging
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc_server.log',
    filemode='a+')

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')
jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter'
jmeter_result_file = os.path.join(os.getcwd(),r'result/result.jtl')
jmeter_result_dir = os.path.join(os.getcwd(),r'result/summary/')

class Performance(rpc_pb2_grpc.RPCServicer):

    def sendConfFile(self, content,context):
        ''' 保存配置文件,如config.jmx '''
        text = content.text
        try:
            conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8')
            conf_handle.write(text)
            logging.info("sendConfFile Success!")
            return rpc_pb2.Status(code=0)
        except Exception,e:
            print e
            return rpc_pb2.Status(code=1)

    def runJMeter(self, content,context):
        logging.info("begin runJmeter.")
        '''执行测试任务,并将实时结果返回'''
        iplist = content.text
        if iplist:
            cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result_file + " -R "  +  iplist
        else:
            cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result_file
        logging.info(cmd)
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True,shell=True)
        for stdout_line in iter(popen.stdout.readline, ""):
            log_line = rpc_pb2.Content(text=stdout_line)
            yield log_line
        popen.stdout.close()
        return_code = popen.wait()
        if return_code:
            logging.warn(return_code)
            raise subprocess.CalledProcessError(return_code, cmd)

    def generateResult(self,empty,context):
        '''调用jmeter,生成汇总测试结果
        '''
        cmd = jmeter_path + " -g " + jmeter_result_file + " -o " +  jmeter_result_dir
        logging.info(cmd)
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,shell=True)
        return_code = popen.wait()
        return rpc_pb2.Status(code=return_code)

    def getResult(self, empty, content):
        '''获取汇总测试结果,返回给客户端
        '''
        summary_file = os.path.join(jmeter_result_dir,r"content/js/dashboard.js")
        file_handle = codecs.open(summary_file,'r',encoding='utf-8')
        for line in file_handle.readlines():
            yield rpc_pb2.Content(text=line)
        file_handle.close()

    def stopJMeter(self, empty, content):
        '''杀死正在执行的任务'''
        os.system("ps -ef | grep jmeter | grep -v grep  | awk '{print $2}'| xargs kill -9")
        return rpc_pb2.Status(code=1)



def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

client端:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import logging
import json
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc.log',
    filemode='a+')

rpc_server = r'192.168.18.128'
rpc_port = '50051'
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')
jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter'
jmeter_result = os.path.join(os.getcwd(),r'conf/result.jtl')

class performance():

  '''性能测试的客户端接口'''

  def __init__(self,ip,port):

    '''初始化,连接RPC服务'''

    logging.info("performance_client init")
    conn = grpc.insecure_channel(ip + ':' + port)
    self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn)

  def sendConfig(self,filename):
      file_handle = codecs.open(filename, 'r', encoding='utf-8')
      content = file_handle.read()
      '''向RPC server发送测试的配置文件'''
      response =  self.stub_client.sendConfFile(rpc_pb2.Content(text=content))
      return response.code


  def runJmeter(self,iplist):

      '''运行测试 返回一个生成器,内容为测试过程中的实时输出'''
      content = iplist
      logging.info("iplist %s" %content)
      for log in self.stub_client.runJMeter(rpc_pb2.Content(text=content)):
          yield log

  def generateResult(self):
      '''在rpc server 端生成 jmeter 最终报告'''
      response = self.stub_client.generateResult(rpc_pb2.empty())
      return response.code

  def getResult(self, local_file):
      '''从RPC server回传测试结果,保存到本地文件local_file'''
      file_handle = codecs.open(local_file, 'w', encoding='utf-8')
      for line in self.stub_client.getResult(rpc_pb2.empty()):
          file_handle.write(line.text)
      file_handle.close()

  def stopJmeter(self):
    '''终止测试任务
    '''
    self.stub_client.stopJMeter(rpc_pb2.empty())

if __name__ == '__main__':
    client = performance(rpc_server,rpc_port)
    code = client.sendConfig('test.jmx')
    iplist = r'10.1.1.1,10.1.1.2'
    for real_time_results in client.runJmeter(None):
        print "get realtime log from server : %s" % real_time_results.text
    print client.generateResult()
    print client.getResult('2222222222222222')
    client.stopJmeter()

结果:

wKiom1kyhayBlly7AAAdbAQudMk914.png-wh_50

当你想停止jmeter压测,调用stop即可。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

python grpc 应用

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

python grpc 应用

1、rpc介绍2、grpc3、基于grpc协议文件传输4、基于grpc协议jmeter压测获取实时结果5、基于grcp协议获取jmeter最终压测报告,并将报告保存至client端6、压测中途停止jmetergrpc server (jme
2023-01-31

python使用grpc,并打包成pyt

xmlrpc也是可行的方案,也相对更加简单一、环境python3.6 二、安装模块pip3 install grpciopip3 install protobufpip3 install grpcio-tools三、准备grpc配置文件gr
2023-01-31

grpc python 和Java实现

Grpc实战教程:说一下目的:实现Python变成的服务端,Java作为客户端,实现二者的通信,实现的功能:传递过来的字符串全部转换为大写 一、安装(Java和Python)1、  Python安装grpc1)      gRPC 的安装,
2023-01-31

Grpc 跨语言远程调用 python

grpc介绍gRPC 一开始由 google 开发,是一款语言中立、平台中立、开源的远程过程调用(RPC)系统。在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得您能够更容易地创建分布式应用和服
2023-01-31

python中gRPC的作用是什么

本篇文章给大家分享的是有关python中gRPC的作用是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。python的五大特点是什么python的五大特点:1.简单易学,开发
2023-06-14

如何用Python搭建gRPC服务

目录一、概述二、安装python需要的库三、定义gRPC的接口四、使用 protoc 和相应的插件编译生成对应语言的代码五、编写grpc的服务端代码六、编写gRPC客户端的代码七、调用测试八、gRPC的使用总结一、概述 一个gRPC服务的大
2022-06-02

怎么使用Python搭建gRPC服务

这篇文章主要介绍了怎么使用Python搭建gRPC服务,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、概述一个gRPC服务的大体结构图为:图一表明,grpc的服务是跨语言的
2023-06-20

怎么用Python编写简单的gRPC服务

这篇文章主要介绍了怎么用Python编写简单的gRPC服务,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。gRPC 是可以在任何环境中运行的现代开源高性能 RPC 框架。它可以
2023-06-20

怎么用Python语言的grpc实现消息传送

这篇“怎么用Python语言的grpc实现消息传送”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“怎么用Python语言的gr
2023-07-05

golang grpc配置使用实战

GogRPC配置使用实战本文深入探讨了GogRPC的配置,涵盖构建微服务和分布式系统所需的服务器和客户端配置。从设置并发流数到启用TLS,从超时选项到负载均衡和压缩,本文提供了全面指南。通过示例代码和详细说明,开发人员可以优化其gRPC应用的性能、安全性、可靠性和可扩展性,以满足特定需求。
golang grpc配置使用实战
2024-04-23

Go中的gRPC怎么用

这篇文章主要讲解了“Go中的gRPC怎么用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Go中的gRPC怎么用”吧!Go GRPC 入门1,安装包grpcgolang-grpc 包提供了 g
2023-06-29

grpc官方文档实验与翻译(python

tensorflow分布式与tensorflow serving底层通信都是是用的grpc,所以就看了一下grpc的基本用法(python版)首先是环境的安装,先要更新pip到version8或者以上$ python -m pip inst
2023-01-31

用Python编写简单的gRPC服务的详细过程

gRPC 是可以在任何环境中运行的现代开源高性能 RPC 框架。它可以通过可插拔的支持来有效地连接数据中心内和跨数据中心的服务,以实现负载平衡,跟踪,运行状况检查和身份验证。它也适用于分布式计算的最后一英里,以将设备,移动应用程序和浏览器连
2022-06-02

golang下grpc框架怎么使用

今天小编给大家分享一下golang下grpc框架怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。1. 什么是grpc和
2023-06-30

SpringBoot集成Dubbo启用gRPC协议

这篇文章主要介绍了SpringBoot集成Dubbo启用gRPC协议,以及与原生gRPC在代码编写过程中的区别。感兴趣的同学可以参考阅读
2023-05-15

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录