示例转自网络,仅用于个人学习,原文连接在reference部分给出。

示例1

import psutil
import socket
from prometheus_client import Gauge,start_http_server
from time import sleep

g = Gauge('cup_use_percent_test_metric', 'Description of gauge',['hostip'])
host_ip = socket.gethostbyname(socket.getfqdn(socket.gethostname()))      #获取本机IP

def get_cup_use():
    cup_use_percent = psutil.cpu_percent(0.5)            #获取CPU使用率
    g.labels(hostip=host_ip).set(cup_use_percent)       #本机IP传入labels,CPU使用率传入value

if __name__ == '__main__':
    start_http_server(8006)           #8006端口启动
    while True:
        get_cup_use()
        sleep(10)

设置标签

from prometheus_client import Counter
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
c.labels('get', '/').inc()
c.labels('post', '/submit').inc()
#Labels can also be passed as keyword-arguments:

from prometheus_client import Counter
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
c.labels(method='get', endpoint='/').inc()
c.labels(method='post', endpoint='/submit').inc()

#Gauge example:
from prometheus_client import Gauge
g = Gauge('my_inprogress_requests', 'Description of gauge',['mylabelname'])

g.labels(mylabelname='labelname').set(30)

remove方法

只能移除含有label的metric,remove里参数全部label values

g1 = Gauge('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
g1.labels(method='get', endpoint='/').set(random.random())
g1.remove('get','/')

示例2

import prometheus_client
from prometheus_client import Gauge
from prometheus_client.core import CollectorRegistry
from flask import Response, Flask
from nmap import nmap
from flask_apscheduler import APScheduler
from psutil import virtual_memory
from psutil import cpu_times

app = Flask(__name__)

scan_data = []

def scan_port():
    host_list = ['192.168.80.100', '192.168.80.101', '192.168.80.102', '192.168.80.103', '192.168.80.104',
                 '192.168.10.11']
    for i in host_list:
        nm = nmap.PortScanner()
        port = '22'
        nm.scan(i, port)
        name = nm[i]['tcp'][int(port)]['name']
        port_state = nm[i]['tcp'][int(port)]['state']
        version = nm[i]['tcp'][int(port)]['version']
        scan_data.append({'host': i, 'port': port, 'port_state': port_state, 'name': name, 'version': version})

class SchedulerConfig(object):
    JOBS = [
        {
            'id': 'update_job',
            'func': '__main__:update_job',
            'args': None,
            'trigger': 'interval',
            'seconds': 10,
        }
    ]

def update_job():
    scan_port()
    print("数据更新完成")

# 为实例化的flask引入定时任务配置
app.config.from_object(SchedulerConfig())

REGISTRY = CollectorRegistry(auto_describe=False)

host_info = Gauge("host_info", "主机信息扫描", ["host", "port", "port_state", "name", "version"], registry=REGISTRY)

mem_percent = Gauge("system_memory_percent", "内存使用率", registry=REGISTRY)

cpu_percent = Gauge("system_cpu_percent", "CPU使用率", registry=REGISTRY)

@app.route("/metrics")
def metrics():
    mem_percent.set(virtual_memory().percent)
    cpu_percent.set(cpu_times().system)

    for i in scan_data:
        host = i.get('host')
        port = i.get('port')
        port_state = i.get('port_state')
        name = i.get('name')
        version = i.get('version')
        host_info.labels(host, port, port_state, name, version)
    return Response(prometheus_client.generate_latest(REGISTRY), mimetype="text/plain")

if __name__ == "__main__":
    # 实例化APScheduler
    scheduler = APScheduler()
    # 把任务列表载入实例flask
    scheduler.init_app(app)
    # 启动任务计划
    scheduler.start()
    # 在debug模式下,Flask的重新加载器将加载两次,因此flask总共有两个进程,会导致定时任务重复执行,禁用重新加载器可解决此问题,或者debug设为false
    app.run(host="0.0.0.0", port=8000, debug=True, use_reloader=False)

Reference