遥测数据改为timescaledb的方案

1. TimescaleDB表结构设计

-- 创建TimescaleDB表结构
CREATE TABLE dms_data_gzdy (
    record_time TIMESTAMPTZ NOT NULL,  -- 使用TIMESTAMPTZ作为时间分区键
    station_name VARCHAR(255),
    feeder_gis_id VARCHAR(100),
    switch_name VARCHAR(255),
    switch_oid VARCHAR(100) NOT NULL,  -- 作为压缩分组键,不能为NULL
    switch_gis_id VARCHAR(100),
    switch_status INTEGER,
    switch_status_quality INTEGER,
    active_power DECIMAL(18,6),
    active_power_quality INTEGER,
    reactive_power DECIMAL(18,6),
    reactive_power_quality INTEGER,
    current_a DECIMAL(18,6),
    current_a_quality INTEGER,
    current_b DECIMAL(18,6),
    current_b_quality INTEGER,
    current_c DECIMAL(18,6),
    current_c_quality INTEGER,
    voltage_uab DECIMAL(18,6),
    voltage_uab_quality INTEGER,
    voltage_ubc DECIMAL(18,6),
    voltage_ubc_quality INTEGER,
    voltage_uca DECIMAL(18,6),
    voltage_uca_quality INTEGER,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    
    -- TimescaleDB需要时间列参与主键
    PRIMARY KEY (record_time, switch_oid)
);

-- 创建hypertable(按天分区)
SELECT create_hypertable('dms_data_gzdy', 'record_time', chunk_time_interval => INTERVAL '1 day');

-- 设置压缩策略
ALTER TABLE dms_data_gzdy SET (
    timescaledb.compress,
    -- 按设备ID分组,查询特定设备时只读相关块
    timescaledb.compress_segmentby = 'switch_oid',
    -- 按时间排序,保持查询时的有序性
    timescaledb.compress_orderby = 'record_time ASC'
);

-- 添加3天后自动压缩策略
SELECT add_compression_policy('dms_data_gzdy', INTERVAL '3 days');

-- 创建优化查询的索引
CREATE INDEX idx_dms_switch_oid ON dms_data_gzdy (switch_oid, record_time);
CREATE INDEX idx_dms_feeder_gis_id ON dms_data_gzdy (feeder_gis_id, record_time);
CREATE INDEX idx_dms_station_name ON dms_data_gzdy (station_name, record_time);

-- 添加注释
COMMENT ON TABLE dms_data_gzdy IS 'DMS EFile 数据表 (GZDY) - TimescaleDB版本';
COMMENT ON COLUMN dms_data_gzdy.record_time IS '记录时间 (来自文件头的可读时间)';
COMMENT ON COLUMN dms_data_gzdy.station_name IS '厂站名';
COMMENT ON COLUMN dms_data_gzdy.feeder_gis_id IS '馈线GISID';
COMMENT ON COLUMN dms_data_gzdy.switch_name IS '开关名';
COMMENT ON COLUMN dms_data_gzdy.switch_oid IS '开关OID';
COMMENT ON COLUMN dms_data_gzdy.switch_gis_id IS '开关GISID';
COMMENT ON COLUMN dms_data_gzdy.switch_status IS '开关状态';
COMMENT ON COLUMN dms_data_gzdy.switch_status_quality IS '开关状态质量码';
COMMENT ON COLUMN dms_data_gzdy.active_power IS '有功';
COMMENT ON COLUMN dms_data_gzdy.active_power_quality IS '有功量测质量码';
COMMENT ON COLUMN dms_data_gzdy.reactive_power IS '无功';
COMMENT ON COLUMN dms_data_gzdy.reactive_power_quality IS '无功量测质量码';
COMMENT ON COLUMN dms_data_gzdy.current_a IS 'A相电流';
COMMENT ON COLUMN dms_data_gzdy.current_a_quality IS 'A相电流质量码';
COMMENT ON COLUMN dms_data_gzdy.current_b IS 'B相电流';
COMMENT ON COLUMN dms_data_gzdy.current_b_quality IS 'B相电流质量码';
COMMENT ON COLUMN dms_data_gzdy.current_c IS 'C相电流';
COMMENT ON COLUMN dms_data_gzdy.current_c_quality IS 'C相电流质量码';
COMMENT ON COLUMN dms_data_gzdy.voltage_uab IS 'Uab电压';
COMMENT ON COLUMN dms_data_gzdy.voltage_uab_quality IS 'Uab电压质量码';
COMMENT ON COLUMN dms_data_gzdy.voltage_ubc IS 'Ubc电压';
COMMENT ON COLUMN dms_data_gzdy.voltage_ubc_quality IS 'Ubc电压质量码';
COMMENT ON COLUMN dms_data_gzdy.voltage_uca IS 'Uca电压';
COMMENT ON COLUMN dms_data_gzdy.voltage_uca_quality IS 'Uca电压质量码';
COMMENT ON COLUMN dms_data_gzdy.created_at IS '数据入库时间';

2. 修改后的PostgreSQL ETL脚本

import sys
import re
import psycopg2
import psycopg2.extras
from datetime import datetime
import time

# ==========================================
# 配置区域
# ==========================================

DB_CONFIG = {
    "host": "127.0.0.1",
    "port": 5432,
    "user": "smart_dms_user",
    "password": "***",
    "database": "smart_dms_db"
}

TABLE_NAME = "dms_data_gzdy"
BATCH_SIZE = 500 
MAX_RETRIES = 3

FIELD_MAPPING = {
    'station_name': '厂站名',
    'feeder_gis_id': '馈线GISID',
    'switch_name': '开关名',
    'switch_oid': '开关OID',
    'switch_gis_id': '开关GISID',
    'switch_status': '开关状态',
    'switch_status_quality': '开关状态质量码',
    'active_power': '有功',
    'active_power_quality': '有功量测质量码',
    'reactive_power': '无功',
    'reactive_power_quality': '无功量测质量码',
    'current_a': 'A相电流',
    'current_a_quality': 'A相电流质量码',
    'current_b': 'B相电流',
    'current_b_quality': 'B相电流质量码',
    'current_c': 'C相电流',
    'current_c_quality': 'C相电流质量码',
    'voltage_uab': 'Uab电压',
    'voltage_uab_quality': 'Uab电压质量码',
    'voltage_ubc': 'Ubc电压',
    'voltage_ubc_quality': 'Ubc电压质量码',
    'voltage_uca': 'Uca电压',
    'voltage_uca_quality': 'Uca电压质量码'
}

# ==========================================
# 工具函数
# ==========================================

def get_db_connection():
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        conn.autocommit = False
        return conn
    except psycopg2.Error as e:
        log(f"FATAL: 数据库连接失败: {e}")
        sys.exit(1)

def parse_value(value, data_type='str'):
    if value is None: return None
    value = value.strip()
    if not value or value.lower() == 'null': return None
    
    try:
        if data_type == 'int':
            val = int(float(value))
            # PostgreSQL BIGINT 范围检查
            if val > 9223372036854775807 or val < -9223372036854775808:
                 return None
            return val
        elif data_type == 'float':
            return float(value)
        else:
            return value
    except ValueError:
        return None

def log(msg):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True)

def insert_batch_with_retry(conn, cursor, sql, data, raw_lines=None):
    if not data: return

    for attempt in range(MAX_RETRIES):
        try:
            # PostgreSQL使用execute_values进行批量插入,性能更好
            psycopg2.extras.execute_values(
                cursor, sql, data, template=None, page_size=BATCH_SIZE
            )
            conn.commit()
            return
        except psycopg2.OperationalError as e:
            # PostgreSQL死锁等错误码
            if 'deadlock' in str(e).lower() or 'lock' in str(e).lower():
                log(f"WARNING: 锁冲突重试 ({attempt+1}/{MAX_RETRIES})...")
                conn.rollback()
                time.sleep(2)
                continue
            else:
                raise e
        except psycopg2.DataError as e:
             # 遇到数据错误
             log("ERROR: 批次包含脏数据,正在定位并剔除...")
             conn.rollback()
             insert_one_by_one(conn, cursor, sql, data, raw_lines)
             return
        except Exception as e:
            log(f"ERROR: 未知写入错误: {e}")
            conn.rollback()
            raise e
            
    raise Exception(f"重试 {MAX_RETRIES} 次后失败")

def insert_one_by_one(conn, cursor, sql_template, data, raw_lines):
    """逐条写入并打印出具体是哪一行数据坏了"""
    success_count = 0
    # 将批量插入SQL转换为单条插入SQL
    single_sql = sql_template.replace('VALUES %s', 'VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)')
    
    for i, row in enumerate(data):
        try:
            cursor.execute(single_sql, row)
            conn.commit()
            success_count += 1
        except Exception as e:
            # 打印出错的具体原因和原始数据内容,方便排查
            log(f"DATA_ERROR: 第 {i+1} 条入库失败: {e}")
            log(f"    -> 问题数据: {row}")
            if raw_lines and i < len(raw_lines):
                 log(f"    -> 原始行内容: {raw_lines[i]}")
            conn.rollback()
    
    log(f"INFO: 逐条清洗完成,成功 {success_count}/{len(data)}")

# ==========================================
# 主逻辑
# ==========================================

def main():
    input_stream = sys.stdin
    
    record_time = None
    header_map = {}
    data_buffer = []
    raw_line_buffer = [] # 缓存原始行,用于报错时回显
    
    found_time = False
    found_header = False
    is_processing_data = False
    
    conn = get_db_connection()
    cursor = conn.cursor()

    # PostgreSQL批量插入SQL(使用execute_values)
    sql = f"""
        INSERT INTO {TABLE_NAME} (
            record_time, station_name, feeder_gis_id, switch_name, switch_oid, switch_gis_id,
            switch_status, switch_status_quality,
            active_power, active_power_quality, reactive_power, reactive_power_quality,
            current_a, current_a_quality, current_b, current_b_quality, current_c, current_c_quality,
            voltage_uab, voltage_uab_quality, voltage_ubc, voltage_ubc_quality, voltage_uca, voltage_uca_quality
        ) VALUES %s
    """

    try:
        for line in input_stream:
            line = line.strip()
            if not line: continue

            if not found_time and line.startswith('<!'):
                match = re.search(r"可读时间='([^']+)'", line)
                if match:
                    record_time = match.group(1).replace('_', ' ')
                    found_time = True
                    log(f"INFO: 解析到记录时间: {record_time}")
                continue

            if not found_header and line.startswith('@'):
                # 使用正则切分表头,兼容 \t 和 多个空格
                headers = re.split(r'\s{2,}|\t', line[1:].strip())
                
                for idx, col_name in enumerate(headers):
                    header_map[col_name.strip()] = idx
                
                if '开关OID' in header_map:
                    found_header = True
                    is_processing_data = True
                    log(f"INFO: 表头解析成功,共 {len(headers)} 列 (智能分隔模式)")
                else:
                    log("WARNING: 表头不符合预期")
                continue

            if is_processing_data:
                if line.startswith('#'):
                    # 使用正则切分数据行
                    # 正则含义:切分 "制表符" 或者 "2个及以上的连续空格"
                    parts = re.split(r'\s{2,}|\t', line[1:].strip())
                    
                    try:
                        def get_val(field_key, type_cvt):
                            e_col_name = FIELD_MAPPING[field_key]
                            idx = header_map.get(e_col_name)
                            if idx is not None and idx < len(parts):
                                return parse_value(parts[idx], type_cvt)
                            return None

                        # switch_oid不能为空(TimescaleDB压缩分组键要求)
                        switch_oid = get_val('switch_oid', 'str')
                        if not switch_oid:
                            continue  # 跳过switch_oid为空的记录

                        row = (
                            record_time,
                            get_val('station_name', 'str'),
                            get_val('feeder_gis_id', 'str'),
                            get_val('switch_name', 'str'),
                            switch_oid,
                            get_val('switch_gis_id', 'str'),
                            get_val('switch_status', 'int'),
                            get_val('switch_status_quality', 'int'),
                            get_val('active_power', 'float'),
                            get_val('active_power_quality', 'int'),
                            get_val('reactive_power', 'float'),
                            get_val('reactive_power_quality', 'int'),
                            get_val('current_a', 'float'),
                            get_val('current_a_quality', 'int'),
                            get_val('current_b', 'float'),
                            get_val('current_b_quality', 'int'),
                            get_val('current_c', 'float'),
                            get_val('current_c_quality', 'int'),
                            get_val('voltage_uab', 'float'),
                            get_val('voltage_uab_quality', 'int'),
                            get_val('voltage_ubc', 'float'),
                            get_val('voltage_ubc_quality', 'int'),
                            get_val('voltage_uca', 'float'),
                            get_val('voltage_uca_quality', 'int'),
                        )
                        data_buffer.append(row)
                        raw_line_buffer.append(line) # 缓存原始行

                        if len(data_buffer) >= BATCH_SIZE:
                            insert_batch_with_retry(conn, cursor, sql, data_buffer, raw_line_buffer)
                            log(f"INFO: 成功写入 {len(data_buffer)} 条记录")
                            data_buffer = []
                            raw_line_buffer = []
                            
                    except Exception as e:
                        log(f"WARNING: 数据行解析失败,跳过: {e}")
                        continue
                else:
                    log(f"SUCCESS: 表读取结束 (遇到 '{line[0]}')")
                    break

        if data_buffer:
            insert_batch_with_retry(conn, cursor, sql, data_buffer, raw_line_buffer)
            log(f"INFO: 剩余 {len(data_buffer)} 条入库完成")

    except Exception as e:
        log(f"FATAL: 处理中断: {e}")
        conn.rollback()
        sys.exit(1)
    finally:
        cursor.close()
        conn.close()

if __name__ == "__main__":
    main()

3. 部署说明

安装依赖

# 安装PostgreSQL驱动
pip install psycopg2-binary

# 或者如果编译安装
sudo apt-get install libpq-dev
pip install psycopg2

配置数据库连接

请根据您的TimescaleDB实际配置修改脚本中的DB_CONFIG

DB_CONFIG = {
    "host": "your_timescaledb_host",
    "port": 5432,
    "user": "your_username", 
    "password": "your_password",
    "database": "your_database_name"
}

4. 主要改进点

  1. 表结构优化

    • 使用TIMESTAMPTZ作为时间列,自动处理时区
    • record_timeswitch_oid组成复合主键,满足TimescaleDB要求
    • 针对时序查询优化的索引设计
  2. TimescaleDB特性

    • 按天分区的hypertable设计
    • 按设备ID分组的压缩策略
    • 3天后自动压缩节省存储空间
  3. 脚本性能优化

    • 使用psycopg2.extras.execute_values进行高性能批量插入
    • 改进的错误处理和重试机制
    • 针对PostgreSQL的数据类型适配
  4. 数据质量保障

    • 确保switch_oid不为空(压缩分组键要求)
    • 完善的异常处理和日志记录
    • 单条插入降级机制处理脏数据

这个方案应该能显著提升查询性能,TimescaleDB在时序数据场景下通常比MySQL快10-100倍。