搜搜吧

查看: 618|回复: 0

远程写入prometheus存储

[复制链接]

超级钻石贵宾会员

1万

主题

2万

帖子

3万

搜搜币

Rank: 1

UID
15343
威望
-561
在线时间
393 小时
注册时间
2015-10-12
发表于 2021-7-13 14:58:01 | 显示全部楼层 |阅读模式


简介

prometheus一般都是采用pull方式获取数据,但是有一些情况下,不方便配置exporter,就希望能通过push的方式上传指标数据。

1、可以采用pushgateway的方式,推送到pushgateway,然后prometheus通过pushgateway拉取数据。

2、在新版本中增加了一个参数:--enable-feature=remote-write-receiver,允许远程通过接口/api/v1/write,直接写数据到prometheus里面。

pushgateway在高并发的情况下还是比较消耗资源的,特别是开启一致性检查,高并发写入的时候特别慢。

第二种方式少了一层转发,速度应该比较快。


接口

可以通过prometheus的http接口/api/v1/write提交数据,这个接口的数据格式有有要求:

  • 使用POST方式提交
  • 需要经过protobuf编码,依赖github.com/gogo/protobuf/proto
  • 可以使用snappy进行压缩,依赖github.com/golang/snappy

步骤:

  • 收集指标名称,时间戳,值和标签
  • 将数据转换成prometheus需要的数据格式
  • 使用proto对数据进行编码,并用snappy进行压缩
  • 通过httpClient提交数据
  • package prome
  • import (
  •     "bufio"
  •     "bytes"
  •     "context"
  •     "io"
  •     "io/ioutil"
  •     "net/http"
  •     "net/url"
  •     "regexp"
  •     "time"
  •     "github.com/gogo/protobuf/proto"
  •     "github.com/golang/snappy"
  •     "github.com/opentracing-contrib/go-stdlib/nethttp"
  •     opentracing "github.com/opentracing/opentracing-go"
  •     "github.com/pkg/errors"
  •     "github.com/prometheus/common/model"
  •     "github.com/prometheus/prometheus/pkg/labels"
  •     "github.com/prometheus/prometheus/prompb"
  • )
  • type RecoverableError struct {
  •     error
  • }
  • type HttpClient struct {
  •     url     *url.URL
  •     Client  *http.Client
  •     timeout time.Duration
  • }
  • var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`)
  • type MetricPoint struct {
  •     Metric  string            `json:"metric"` // 指标名称
  •     TagsMap map[string]string `json:"tags"`   // 数据标签
  •     Time    int64             `json:"time"`   // 时间戳,单位是秒
  •     Value   float64           `json:"value"`  // 内部字段,最终转换之后的float64数值
  • }
  • func (c *HttpClient) remoteWritePost(req []byte) error {
  •     httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req))
  •     if err != nil {
  •         return err
  •     }
  •     httpReq.Header.Add("Content-Encoding", "snappy")
  •     httpReq.Header.Set("Content-Type", "application/x-protobuf")
  •     httpReq.Header.Set("User-Agent", "opcai")
  •     httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
  •     ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  •     defer cancel()
  •     httpReq = httpReq.WithContext(ctx)
  •     if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
  •         var ht *nethttp.Tracer
  •         httpReq, ht = nethttp.TraceRequest(
  •             parentSpan.Tracer(),
  •             httpReq,
  •             nethttp.OperationName("Remote Store"),
  •             nethttp.ClientTrace(false),
  •         )
  •         defer ht.Finish()
  •     }
  •     httpResp, err := c.Client.Do(httpReq)
  •     if err != nil {
  •         // Errors from Client.Do are from (for example) network errors, so are
  •         // recoverable.
  •         return RecoverableError{err}
  •     }
  •     defer func() {
  •         io.Copy(ioutil.Discard, httpResp.Body)
  •         httpResp.Body.Close()
  •     }()
  •     if httpResp.StatusCode/100 != 2 {
  •         scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512))
  •         line := ""
  •         if scanner.Scan() {
  •             line = scanner.Text()
  •         }
  •         err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
  •     }
  •     if httpResp.StatusCode/100 == 5 {
  •         return RecoverableError{err}
  •     }
  •     return err
  • }
  • func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) {
  •     req := &prompb.WriteRequest{
  •         Timeseries: samples,
  •     }
  •     data, err := proto.Marshal(req)
  •     if err != nil {
  •         return nil, err
  •     }
  •     compressed := snappy.Encode(nil, data)
  •     return compressed, nil
  • }
  • type sample struct {
  •     labels labels.Labels
  •     t      int64
  •     v      float64
  • }
  • const (
  •     LABEL_NAME = "__name__"
  • )
  • func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) {
  •     pt := prompb.TimeSeries{}
  •     pt.Samples = []prompb.Sample{{}}
  •     s := sample{}
  •     s.t = item.Time
  •     s.v = item.Value
  •     // name
  •     if !MetricNameRE.MatchString(item.Metric) {
  •         return &pt, errors.New("invalid metrics name")
  •     }
  •     nameLs := labels.Label{
  •         Name:  LABEL_NAME,
  •         Value: item.Metric,
  •     }
  •     s.labels = append(s.labels, nameLs)
  •     for k, v := range item.TagsMap {
  •         if model.LabelNameRE.MatchString(k) {
  •             ls := labels.Label{
  •                 Name:  k,
  •                 Value: v,
  •             }
  •             s.labels = append(s.labels, ls)
  •         }
  •     }
  •     pt.Labels = labelsToLabelsProto(s.labels, pt.Labels)
  •     // 时间赋值问题,使用毫秒时间戳
  •     tsMs := time.Unix(s.t, 0).UnixNano() / 1e6
  •     pt.Samples[0].Timestamp = tsMs
  •     pt.Samples[0].Value = s.v
  •     return &pt, nil
  • }
  • func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label {
  •     result := buf[:0]
  •     if cap(buf) < len(labels) {
  •         result = make([]*prompb.Label, 0, len(labels))
  •     }
  •     for _, l := range labels {
  •         result = append(result, &prompb.Label{
  •             Name:  l.Name,
  •             Value: l.Value,
  •         })
  •     }
  •     return result
  • }
  • func (c *HttpClient) RemoteWrite(items []MetricPoint) (err error) {
  •     if len(items) == 0 {
  •         return
  •     }
  •     ts := make([]*prompb.TimeSeries, len(items))
  •     for i := range items {
  •         ts, err = convertOne(&items)
  •         if err != nil {
  •             return
  •         }
  •     }
  •     data, err := buildWriteRequest(ts)
  •     if err != nil {
  •         return
  •     }
  •     err = c.remoteWritePost(data)
  •     return
  • }
  • func NewClient(ur string, timeout time.Duration) (c *HttpClient, err error) {
  •     u, err := url.Parse(ur)
  •     if err != nil {
  •         return
  •     }
  •     c = &HttpClient{
  •         url:     u,
  •         Client:  &http.Client{},
  •         timeout: timeout,
  •     }
  •     return
  • }
测试

prometheus启动的时候记得加参数--enable-feature=remote-write-receiver

  • package prome
  • import (
  •     "testing"
  •     "time"
  • )
  • func TestRemoteWrite(t *testing.T) {
  •     c, err := NewClient("http://localhost:9090/api/v1/write", 10*time.Second)
  •     if err != nil {
  •         t.Fatal(err)
  •     }
  •     metrics := []MetricPoint{
  •         {Metric: "opcai1",
  •             TagsMap: map[string]string{"env": "testing", "op": "opcai"},
  •             Time:    time.Now().Add(-1 * time.Minute).Unix(),
  •             Value:   1},
  •         {Metric: "opcai2",
  •             TagsMap: map[string]string{"env": "testing", "op": "opcai"},
  •             Time:    time.Now().Add(-2 * time.Minute).Unix(),
  •             Value:   2},
  •         {Metric: "opcai3",
  •             TagsMap: map[string]string{"env": "testing", "op": "opcai"},
  •             Time:    time.Now().Unix(),
  •             Value:   3},
  •         {Metric: "opcai4",
  •             TagsMap: map[string]string{"env": "testing", "op": "opcai"},
  •             Time:    time.Now().Unix(),
  •             Value:   4},
  •     }
  •     err = c.RemoteWrite(metrics)
  •     if err != nil {
  •         t.Fatal(err)
  •     }
  •     t.Log("end...")
  • }

使用go test进行测试

  • go test -v
总结

这个方法也是在看夜莺v5的代码的时候发现的,刚好有需要统一收集redis的监控指标,刚好可以用上,之前用pushgateway写的实在是慢。


过年了,祝各位新年快乐
Powered by www.sosoba.org Copyright © 2013-2021 搜搜吧社区 小黑屋|手机版|Archiver|地图|联系站长|腾讯云代金券|公共DNS|seo优化服务|搜搜吧
广告服务/项目合作/会员购买:QQ 侵权举报邮箱: fuwu-sosoba@qq.com 举报流程必看 搜搜吧建站时间:创建于2013年07月23日
免责声明:本站所有的内容均来自互联网以及第三方作者自由发布,版权归原作者版权所有,搜搜吧不承担任何的法律责任,若有侵权请来信告知,我们立即删除!
版权声明:搜搜吧影视资源均收集自互联网,没有提供影片资源存储和下载,也未参与录制上传,若本站收录的资源涉及您的版权或知识产权或其他利益,我们会立即删除

GMT+8, 2021-9-29 10:35 , Processed in 0.029938 second(s), 6 queries , Redis On.

快速回复 返回顶部 返回列表