Skip to content

Commit

Permalink
增加:
Browse files Browse the repository at this point in the history
1.增加 h5推出的sse客户端,用于处理服务端持续推送的数据.
bug修复:
1.程序默认请求超时时间为0秒时,重置为30秒,导致场景存在局限性,本次修正为与标注库保持一致,timeout为0秒时用不超时.
  • Loading branch information
qifengzhang007 committed Feb 18, 2023
1 parent add2b71 commit 79c35a1
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 38 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
> 11.增加简体中文与utf-8编码互转函数,不管是发送还是接受都随意对字符进行编码转换.
> 12.增加 `XML` 格式数据提交,方便对接java类语言开发的 `webservice` 接口.
> 13.创建 `httpClient` 对象时使用 `sync.pool` 临时对象池,使客户端的创建更加高效,服务器资源占用更低,满足开发者频繁创建客户端采集数据.
> 14.增加 `sse` 客户端, 用于支持、处理 `h5` 推出的 `sse` 数据推送技术 .
### 安装 goCurl 包
```code
Expand Down Expand Up @@ -46,7 +47,7 @@ go get github.com/qifengzhang007/[email protected]
BaseURI string
FormParams map[string]interface{}
JSON interface{}
Timeout float32 // 超时时间,单位:秒
Timeout float32 // 超时时间,单位:秒, 如果不设置或者设置为 0 表示程序一直等待不自动中断
Cookies interface{}
Proxy string
// 如果请求的站点响应头 Header["Content-Type"] 中没有明确的 charset=utf-8 、charset=gb2312 等
Expand Down
2 changes: 1 addition & 1 deletion goCurl.go → go_curl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func mergeDefaultParams(defaultHeaders Options, options ...Options) Options {
options[0].BaseURI = options[1].BaseURI
}

if options[0].Timeout <= 0 && options[1].Timeout > 0 {
if options[0].Timeout <= 0 && options[1].Timeout >= 0 {
options[0].Timeout = options[1].Timeout
}
if options[0].Proxy == "" && options[1].Proxy != "" {
Expand Down
51 changes: 44 additions & 7 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (r *Request) Get(uri string, opts ...Options) (*Response, error) {
return r.Request("GET", uri, opts...)
}

// Get method download files
// Down method download files
func (r *Request) Down(resourceUrl string, savePath, saveName string, opts ...Options) (bool, error) {
var vError error
var vResponse *Response
Expand Down Expand Up @@ -64,12 +64,12 @@ func (r *Request) saveFile(body io.ReadCloser, fileName string) (bool, error) {
_ = body.Close()
_ = file.Close()
}()
reader := bufio.NewReaderSize(body, 1024*50) //相当于一个临时缓冲区(设置为可以单次存储5M的文件),每次读取以后就把原始数据重新加载一份,等待下一次读取
reader := bufio.NewReader(body)
if err != nil {
return false, err
}
writer := bufio.NewWriter(file)
buff := make([]byte, 50*1024)
buff := make([]byte, 4096)

for {
currReadSize, readerErr := reader.Read(buff)
Expand Down Expand Up @@ -114,6 +114,42 @@ func (r *Request) Delete(uri string, opts ...Options) (*Response, error) {
return r.Request("DELETE", uri, opts...)
}

// SseGet sse客户端通过get请求,持续获取服务端推送的数据流
func (r *Request) SseGet(uri string, fn func(msgType, content string) bool) error {
headerOpt := Options{
Headers: map[string]interface{}{
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
Timeout: -1,
}
if resp, err := r.Get(uri, headerOpt); err == nil {
body := resp.GetBody()
defer func() {
_ = body.Close()
}()
ioReader := bufio.NewReader(body)
for {
if bys, err := ioReader.ReadBytes('\n'); err == nil && len(bys) > 4 {
delim := []byte{':', ' '}
byteSliceSlice := bytes.Split(bys, delim)
if len(byteSliceSlice) == 2 {
if !fn(string(byteSliceSlice[0]), string(byteSliceSlice[1])) {
return nil
}
}
} else {
// 如果ioreader关联的缓冲区没有内容,通过休眠5毫秒让出协程(避免死循环导致cpu占用率过高)
// 相对网络请求的耗时, 3ms 时间几乎不构成任何影响
time.Sleep(time.Millisecond * 3)
}
}
} else {
return errors.New(err.Error())
}
}

// Options send options request
func (r *Request) Options(uri string, opts ...Options) (*Response, error) {
return r.Request("OPTIONS", uri, opts...)
Expand Down Expand Up @@ -176,10 +212,11 @@ func (r *Request) Request(method, uri string, opts ...Options) (*Response, error
}

func (r *Request) parseTimeout() {
if r.opts.Timeout == 0 {
r.opts.Timeout = 30
if r.opts.Timeout > 0 {
r.opts.timeout = time.Duration(r.opts.Timeout*1000) * time.Millisecond
} else {
r.opts.Timeout = 0
}
r.opts.timeout = time.Duration(r.opts.Timeout*1000) * time.Millisecond
}

func (r *Request) parseClient() {
Expand Down Expand Up @@ -308,7 +345,7 @@ func (r *Request) parseGetFormData() string {
}
}

//(接受到的)简体中文 转换为 utf-8
// (接受到的)简体中文 转换为 utf-8
func (r *Request) SimpleChineseToUtf8(vBytes []byte) string {
return mahonia.NewDecoder("GB18030").ConvertString(string(vBytes))
}
Expand Down
3 changes: 0 additions & 3 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,12 @@ func (r *Response) GetContents() (bodyStr string, err error) {
return bodyStr, nil
}

// Get Response ContentLength
func (r *Response) GetContentLength() int64 {
return r.resp.ContentLength
}

// GetBody parse response body
func (r *Response) GetBody() io.ReadCloser {
//defer r.resp.Body.Close()

return r.resp.Body
}

Expand Down
88 changes: 62 additions & 26 deletions test/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"testing"
)

// get 网站编码为 gbk
// get 网站编码为 gbk
//
// 主要测试 get 请求以及自动转换被采集网站的编码,保证返回的数据是正常的
func TestRequestGet(t *testing.T) {

Expand Down Expand Up @@ -40,26 +41,33 @@ func TestRequestGet(t *testing.T) {
func TestRequestGet2(t *testing.T) {

// 创建 http 客户端的时候可以直接填充一些公共参数,后续请求会复用
cli := goCurl.CreateHttpClient()
resp, err := cli.Get("http://49.232.145.118:20171/api/v1/portal/news?newsType=10&page=1&limit=50")
//t.Logf("请求参数:%v\n", resp.GetRequest())
if err != nil && resp == nil {
t.Errorf("单元测试失败,错误明细:%s\n", err.Error())
}
if err != nil {
t.Errorf("请求出错:%s\n", err.Error())
} else {
txt, err := resp.GetContents()
if err == nil {
t.Logf("请求结果:%s\n", txt)
} else {
cli := goCurl.CreateHttpClient(goCurl.Options{
Headers: map[string]interface{}{
"Connection": "keep-alive",
},
})
for i := 1; i < 5; i++ {
resp, err := cli.Get("http://49.232.145.118:20171/api/v1/portal/news?newsType=10&page=1&limit=50")
//t.Logf("请求参数:%v\n", resp.GetRequest())
if err != nil && resp == nil {
t.Errorf("单元测试失败,错误明细:%s\n", err.Error())
}
if err != nil {
t.Errorf("请求出错:%s\n", err.Error())
} else {
txt, err := resp.GetContents()
if err == nil {
t.Logf("请求结果:%s\n", txt)
} else {
t.Errorf("单元测试失败,错误明细:%s\n", err.Error())
}
}
}

}

// https 以及 表单参数
// get请求参数如果不是特别长,建议和地址拼接在一起请求,例如: https://www.oschina.net/search?scope=project&q=golang
// https 以及 表单参数
// get请求参数如果不是特别长,建议和地址拼接在一起请求,例如: https://www.oschina.net/search?scope=project&q=golang
func TestRequestGetWithQuery(t *testing.T) {
cli := goCurl.CreateHttpClient()
// cli.Get 切换成 cli.Post 就是 post 方式提交表单参数
Expand Down Expand Up @@ -112,8 +120,8 @@ func TestRequestSendChinese(t *testing.T) {

}

// post提交 json 数据
// 注意:这里的 header 头字段 Content-Type 必须设置为 application/json 格式
// post提交 json 数据
// 注意:这里的 header 头字段 Content-Type 必须设置为 application/json 格式
func TestRequestPostWithJSON(t *testing.T) {
cli := goCurl.CreateHttpClient()

Expand All @@ -139,8 +147,9 @@ func TestRequestPostWithJSON(t *testing.T) {
}
}

// post向 webservice接口提交 xml 数据(以表单参数形式提交x-www-form-urlencoded)
// webservice测试地址以及接口说明:http://www.webxml.com.cn/WebServices/ChinaZipSearchWebService.asmx/getSupportCity
// post向 webservice接口提交 xml 数据(以表单参数形式提交x-www-form-urlencoded)
// webservice测试地址以及接口说明:http://www.webxml.com.cn/WebServices/ChinaZipSearchWebService.asmx/getSupportCity
//
// 浏览器打开以上地址,F12 可以查看webservice 接口以表单形式是如何发送数据的
func TestRequestPostFormDataWithXml(t *testing.T) {
cli := goCurl.CreateHttpClient()
Expand All @@ -167,8 +176,8 @@ func TestRequestPostFormDataWithXml(t *testing.T) {
}
}

// post向 webservice接口提交 xml 数据(以raw方式提交)
// webservice测试地址以及接口说明:http://www.webxml.com.cn/WebServices/ChinaZipSearchWebService.asmx
// post向 webservice接口提交 xml 数据(以raw方式提交)
// webservice测试地址以及接口说明:http://www.webxml.com.cn/WebServices/ChinaZipSearchWebService.asmx
func TestRequestPostRawWithXml(t *testing.T) {
cli := goCurl.CreateHttpClient(goCurl.Options{
SetResCharset: "utf-8",
Expand Down Expand Up @@ -217,8 +226,8 @@ func TestRequestGetWithProxy(t *testing.T) {
cli := goCurl.CreateHttpClient()

resp, err := cli.Get("http://myip.top/", goCurl.Options{
Timeout: 5.0,
Proxy: "http://39.96.11.196:3211", // 该ip需要自己去申请每日免费试用
Timeout: 60,
Proxy: "http://113.241.137.248:4330", // 该ip需要自己去申请每日免费试用
})
if err != nil {
t.Errorf("请求出错:%s\n", err.Error())
Expand Down Expand Up @@ -313,7 +322,7 @@ func TestRequestPostWithCookiesMap(t *testing.T) {
}
}

// Put 方式提交数据
// Put 方式提交数据
func TestRequestPut(t *testing.T) {
cli := goCurl.CreateHttpClient()

Expand All @@ -330,7 +339,7 @@ func TestRequestPut(t *testing.T) {
}
}

// Delete方式提交数据
// Delete方式提交数据
func TestRequestDelete(t *testing.T) {
cli := goCurl.CreateHttpClient()

Expand All @@ -345,3 +354,30 @@ func TestRequestDelete(t *testing.T) {
t.Errorf("单元测试失败,错误明细:%s\n", err.Error())
}
}

// SseGet 通过sse客户端的get请求获取服务端持续推送的数据流
func TestRequestSseGet(t *testing.T) {
sseServerUrl := "https://92.push2.eastmoney.com/api/qt/stock/details/sse?fields1=f1,f2,f3,f4&fields2=f51,f52,f53,f54,f55&mpi=2000&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&pos=-0&secid=1.600460&wbp2u=|0|0|0|web"
cli := goCurl.CreateHttpClient()

// SseGet 方法会阻塞目前的代码,如果需要异步接收处理sseClient收到的消息,请使用go协程启动该方法
err := cli.SseGet(sseServerUrl, func(msgType, content string) bool {

switch msgType {
case "event":
// 事件类型的消息格式
t.Logf("(event)事件类型的消息:\n%+v\n", content)
case "data":
// 数据类型的消息格式
t.Logf("服务端推送的业务数据(data):\n%+v\n", content)
}

// 这里是回调函数的返回值:
// true 表示持续接受服务端的推送数据,
// false 表示只接受一次服务端的推送数据后,主动关闭客户端不在接受后续数据
return true
})
if err != nil {
t.Errorf("单元测试失败,错误明细:%s\n", err.Error())
}
}

0 comments on commit 79c35a1

Please sign in to comment.