diff --git a/README.md b/README.md index eaf7bb6..59f0529 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ > 11.增加简体中文与utf-8编码互转函数,不管是发送还是接受都随意对字符进行编码转换. > 12.增加 `XML` 格式数据提交,方便对接java类语言开发的 `webservice` 接口. > 13.创建 `httpClient` 对象时使用 `sync.pool` 临时对象池,使客户端的创建更加高效,服务器资源占用更低,满足开发者频繁创建客户端采集数据. +> 14.增加 `sse` 客户端, 用于支持、处理 `h5` 推出的 `sse` 数据推送技术 . ### 安装 goCurl 包 ```code @@ -46,7 +47,7 @@ go get github.com/qifengzhang007/goCurl@v1.3.8 BaseURI string FormParams map[string]interface{} JSON interface{} - Timeout float32 // 超时时间,单位:秒 + Timeout float32 // 超时时间,单位:秒, 如果不设置或者设置为 0 表示程序一直等待不自动中断 Cookies interface{} Proxy string // 如果请求的站点响应头 Header["Content-Type"] 中没有明确的 charset=utf-8 、charset=gb2312 等 diff --git a/goCurl.go b/go_curl.go similarity index 98% rename from goCurl.go rename to go_curl.go index b7930e4..357bc09 100644 --- a/goCurl.go +++ b/go_curl.go @@ -55,7 +55,7 @@ func mergeDefaultParams(defaultHeaders Options, options ...Options) Options { options[0].BaseURI = options[1].BaseURI } - if options[0].Timeout <= 0 && options[1].Timeout > 0 { + if options[0].Timeout <= 0 && options[1].Timeout >= 0 { options[0].Timeout = options[1].Timeout } if options[0].Proxy == "" && options[1].Proxy != "" { diff --git a/request.go b/request.go index 7ab6741..a044a01 100644 --- a/request.go +++ b/request.go @@ -33,7 +33,7 @@ func (r *Request) Get(uri string, opts ...Options) (*Response, error) { return r.Request("GET", uri, opts...) } -// Get method download files +// Down method download files func (r *Request) Down(resourceUrl string, savePath, saveName string, opts ...Options) (bool, error) { var vError error var vResponse *Response @@ -64,12 +64,12 @@ func (r *Request) saveFile(body io.ReadCloser, fileName string) (bool, error) { _ = body.Close() _ = file.Close() }() - reader := bufio.NewReaderSize(body, 1024*50) //相当于一个临时缓冲区(设置为可以单次存储5M的文件),每次读取以后就把原始数据重新加载一份,等待下一次读取 + reader := bufio.NewReader(body) if err != nil { return false, err } writer := bufio.NewWriter(file) - buff := make([]byte, 50*1024) + buff := make([]byte, 4096) for { currReadSize, readerErr := reader.Read(buff) @@ -114,6 +114,42 @@ func (r *Request) Delete(uri string, opts ...Options) (*Response, error) { return r.Request("DELETE", uri, opts...) } +// SseGet sse客户端通过get请求,持续获取服务端推送的数据流 +func (r *Request) SseGet(uri string, fn func(msgType, content string) bool) error { + headerOpt := Options{ + Headers: map[string]interface{}{ + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + Timeout: -1, + } + if resp, err := r.Get(uri, headerOpt); err == nil { + body := resp.GetBody() + defer func() { + _ = body.Close() + }() + ioReader := bufio.NewReader(body) + for { + if bys, err := ioReader.ReadBytes('\n'); err == nil && len(bys) > 4 { + delim := []byte{':', ' '} + byteSliceSlice := bytes.Split(bys, delim) + if len(byteSliceSlice) == 2 { + if !fn(string(byteSliceSlice[0]), string(byteSliceSlice[1])) { + return nil + } + } + } else { + // 如果ioreader关联的缓冲区没有内容,通过休眠5毫秒让出协程(避免死循环导致cpu占用率过高) + // 相对网络请求的耗时, 3ms 时间几乎不构成任何影响 + time.Sleep(time.Millisecond * 3) + } + } + } else { + return errors.New(err.Error()) + } +} + // Options send options request func (r *Request) Options(uri string, opts ...Options) (*Response, error) { return r.Request("OPTIONS", uri, opts...) @@ -176,10 +212,11 @@ func (r *Request) Request(method, uri string, opts ...Options) (*Response, error } func (r *Request) parseTimeout() { - if r.opts.Timeout == 0 { - r.opts.Timeout = 30 + if r.opts.Timeout > 0 { + r.opts.timeout = time.Duration(r.opts.Timeout*1000) * time.Millisecond + } else { + r.opts.Timeout = 0 } - r.opts.timeout = time.Duration(r.opts.Timeout*1000) * time.Millisecond } func (r *Request) parseClient() { @@ -308,7 +345,7 @@ func (r *Request) parseGetFormData() string { } } -//(接受到的)简体中文 转换为 utf-8 +// (接受到的)简体中文 转换为 utf-8 func (r *Request) SimpleChineseToUtf8(vBytes []byte) string { return mahonia.NewDecoder("GB18030").ConvertString(string(vBytes)) } diff --git a/response.go b/response.go index 9354691..5203fa7 100644 --- a/response.go +++ b/response.go @@ -79,15 +79,12 @@ func (r *Response) GetContents() (bodyStr string, err error) { return bodyStr, nil } -// Get Response ContentLength func (r *Response) GetContentLength() int64 { return r.resp.ContentLength } // GetBody parse response body func (r *Response) GetBody() io.ReadCloser { - //defer r.resp.Body.Close() - return r.resp.Body } diff --git a/test/request_test.go b/test/request_test.go index 1ae59d0..caf644e 100644 --- a/test/request_test.go +++ b/test/request_test.go @@ -8,7 +8,8 @@ import ( "testing" ) -// get 网站编码为 gbk +// get 网站编码为 gbk +// // 主要测试 get 请求以及自动转换被采集网站的编码,保证返回的数据是正常的 func TestRequestGet(t *testing.T) { @@ -40,26 +41,33 @@ func TestRequestGet(t *testing.T) { func TestRequestGet2(t *testing.T) { // 创建 http 客户端的时候可以直接填充一些公共参数,后续请求会复用 - cli := goCurl.CreateHttpClient() - resp, err := cli.Get("http://49.232.145.118:20171/api/v1/portal/news?newsType=10&page=1&limit=50") - //t.Logf("请求参数:%v\n", resp.GetRequest()) - if err != nil && resp == nil { - t.Errorf("单元测试失败,错误明细:%s\n", err.Error()) - } - if err != nil { - t.Errorf("请求出错:%s\n", err.Error()) - } else { - txt, err := resp.GetContents() - if err == nil { - t.Logf("请求结果:%s\n", txt) - } else { + cli := goCurl.CreateHttpClient(goCurl.Options{ + Headers: map[string]interface{}{ + "Connection": "keep-alive", + }, + }) + for i := 1; i < 5; i++ { + resp, err := cli.Get("http://49.232.145.118:20171/api/v1/portal/news?newsType=10&page=1&limit=50") + //t.Logf("请求参数:%v\n", resp.GetRequest()) + if err != nil && resp == nil { t.Errorf("单元测试失败,错误明细:%s\n", err.Error()) } + if err != nil { + t.Errorf("请求出错:%s\n", err.Error()) + } else { + txt, err := resp.GetContents() + if err == nil { + t.Logf("请求结果:%s\n", txt) + } else { + t.Errorf("单元测试失败,错误明细:%s\n", err.Error()) + } + } } + } -// https 以及 表单参数 -// get请求参数如果不是特别长,建议和地址拼接在一起请求,例如: https://www.oschina.net/search?scope=project&q=golang +// https 以及 表单参数 +// get请求参数如果不是特别长,建议和地址拼接在一起请求,例如: https://www.oschina.net/search?scope=project&q=golang func TestRequestGetWithQuery(t *testing.T) { cli := goCurl.CreateHttpClient() // cli.Get 切换成 cli.Post 就是 post 方式提交表单参数 @@ -112,8 +120,8 @@ func TestRequestSendChinese(t *testing.T) { } -// post提交 json 数据 -// 注意:这里的 header 头字段 Content-Type 必须设置为 application/json 格式 +// post提交 json 数据 +// 注意:这里的 header 头字段 Content-Type 必须设置为 application/json 格式 func TestRequestPostWithJSON(t *testing.T) { cli := goCurl.CreateHttpClient() @@ -139,8 +147,9 @@ func TestRequestPostWithJSON(t *testing.T) { } } -// post向 webservice接口提交 xml 数据(以表单参数形式提交x-www-form-urlencoded) -// webservice测试地址以及接口说明:http://www.webxml.com.cn/WebServices/ChinaZipSearchWebService.asmx/getSupportCity +// post向 webservice接口提交 xml 数据(以表单参数形式提交x-www-form-urlencoded) +// webservice测试地址以及接口说明:http://www.webxml.com.cn/WebServices/ChinaZipSearchWebService.asmx/getSupportCity +// // 浏览器打开以上地址,F12 可以查看webservice 接口以表单形式是如何发送数据的 func TestRequestPostFormDataWithXml(t *testing.T) { cli := goCurl.CreateHttpClient() @@ -167,8 +176,8 @@ func TestRequestPostFormDataWithXml(t *testing.T) { } } -// post向 webservice接口提交 xml 数据(以raw方式提交) -// webservice测试地址以及接口说明:http://www.webxml.com.cn/WebServices/ChinaZipSearchWebService.asmx +// post向 webservice接口提交 xml 数据(以raw方式提交) +// webservice测试地址以及接口说明:http://www.webxml.com.cn/WebServices/ChinaZipSearchWebService.asmx func TestRequestPostRawWithXml(t *testing.T) { cli := goCurl.CreateHttpClient(goCurl.Options{ SetResCharset: "utf-8", @@ -217,8 +226,8 @@ func TestRequestGetWithProxy(t *testing.T) { cli := goCurl.CreateHttpClient() resp, err := cli.Get("http://myip.top/", goCurl.Options{ - Timeout: 5.0, - Proxy: "http://39.96.11.196:3211", // 该ip需要自己去申请每日免费试用 + Timeout: 60, + Proxy: "http://113.241.137.248:4330", // 该ip需要自己去申请每日免费试用 }) if err != nil { t.Errorf("请求出错:%s\n", err.Error()) @@ -313,7 +322,7 @@ func TestRequestPostWithCookiesMap(t *testing.T) { } } -// Put 方式提交数据 +// Put 方式提交数据 func TestRequestPut(t *testing.T) { cli := goCurl.CreateHttpClient() @@ -330,7 +339,7 @@ func TestRequestPut(t *testing.T) { } } -// Delete方式提交数据 +// Delete方式提交数据 func TestRequestDelete(t *testing.T) { cli := goCurl.CreateHttpClient() @@ -345,3 +354,30 @@ func TestRequestDelete(t *testing.T) { t.Errorf("单元测试失败,错误明细:%s\n", err.Error()) } } + +// SseGet 通过sse客户端的get请求获取服务端持续推送的数据流 +func TestRequestSseGet(t *testing.T) { + sseServerUrl := "https://92.push2.eastmoney.com/api/qt/stock/details/sse?fields1=f1,f2,f3,f4&fields2=f51,f52,f53,f54,f55&mpi=2000&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&pos=-0&secid=1.600460&wbp2u=|0|0|0|web" + cli := goCurl.CreateHttpClient() + + // SseGet 方法会阻塞目前的代码,如果需要异步接收处理sseClient收到的消息,请使用go协程启动该方法 + err := cli.SseGet(sseServerUrl, func(msgType, content string) bool { + + switch msgType { + case "event": + // 事件类型的消息格式 + t.Logf("(event)事件类型的消息:\n%+v\n", content) + case "data": + // 数据类型的消息格式 + t.Logf("服务端推送的业务数据(data):\n%+v\n", content) + } + + // 这里是回调函数的返回值: + // true 表示持续接受服务端的推送数据, + // false 表示只接受一次服务端的推送数据后,主动关闭客户端不在接受后续数据 + return true + }) + if err != nil { + t.Errorf("单元测试失败,错误明细:%s\n", err.Error()) + } +}