-- 创建TimescaleDB表结构
CREATE TABLE dms_data_gzdy (
record_time TIMESTAMPTZ NOT NULL, -- 使用TIMESTAMPTZ作为时间分区键
station_name VARCHAR(255),
feeder_gis_id VARCHAR(100),
switch_name VARCHAR(255),
switch_oid VARCHAR(100) NOT NULL, -- 作为压缩分组键,不能为NULL
switch_gis_id VARCHAR(100),
switch_status INTEGER,
switch_status_quality INTEGER,
active_power DECIMAL(18,6),
active_power_quality INTEGER,
reactive_power DECIMAL(18,6),
reactive_power_quality INTEGER,
current_a DECIMAL(18,6),
current_a_quality INTEGER,
current_b DECIMAL(18,6),
current_b_quality INTEGER,
current_c DECIMAL(18,6),
current_c_quality INTEGER,
voltage_uab DECIMAL(18,6),
voltage_uab_quality INTEGER,
voltage_ubc DECIMAL(18,6),
voltage_ubc_quality INTEGER,
voltage_uca DECIMAL(18,6),
voltage_uca_quality INTEGER,
created_at TIMESTAMPTZ DEFAULT NOW(),
-- TimescaleDB需要时间列参与主键
PRIMARY KEY (record_time, switch_oid)
);
-- 创建hypertable(按天分区)
SELECT create_hypertable('dms_data_gzdy', 'record_time', chunk_time_interval => INTERVAL '1 day');
-- 设置压缩策略
ALTER TABLE dms_data_gzdy SET (
timescaledb.compress,
-- 按设备ID分组,查询特定设备时只读相关块
timescaledb.compress_segmentby = 'switch_oid',
-- 按时间排序,保持查询时的有序性
timescaledb.compress_orderby = 'record_time ASC'
);
-- 添加3天后自动压缩策略
SELECT add_compression_policy('dms_data_gzdy', INTERVAL '3 days');
-- 创建优化查询的索引
CREATE INDEX idx_dms_switch_oid ON dms_data_gzdy (switch_oid, record_time);
CREATE INDEX idx_dms_feeder_gis_id ON dms_data_gzdy (feeder_gis_id, record_time);
CREATE INDEX idx_dms_station_name ON dms_data_gzdy (station_name, record_time);
-- 添加注释
COMMENT ON TABLE dms_data_gzdy IS 'DMS EFile 数据表 (GZDY) - TimescaleDB版本';
COMMENT ON COLUMN dms_data_gzdy.record_time IS '记录时间 (来自文件头的可读时间)';
COMMENT ON COLUMN dms_data_gzdy.station_name IS '厂站名';
COMMENT ON COLUMN dms_data_gzdy.feeder_gis_id IS '馈线GISID';
COMMENT ON COLUMN dms_data_gzdy.switch_name IS '开关名';
COMMENT ON COLUMN dms_data_gzdy.switch_oid IS '开关OID';
COMMENT ON COLUMN dms_data_gzdy.switch_gis_id IS '开关GISID';
COMMENT ON COLUMN dms_data_gzdy.switch_status IS '开关状态';
COMMENT ON COLUMN dms_data_gzdy.switch_status_quality IS '开关状态质量码';
COMMENT ON COLUMN dms_data_gzdy.active_power IS '有功';
COMMENT ON COLUMN dms_data_gzdy.active_power_quality IS '有功量测质量码';
COMMENT ON COLUMN dms_data_gzdy.reactive_power IS '无功';
COMMENT ON COLUMN dms_data_gzdy.reactive_power_quality IS '无功量测质量码';
COMMENT ON COLUMN dms_data_gzdy.current_a IS 'A相电流';
COMMENT ON COLUMN dms_data_gzdy.current_a_quality IS 'A相电流质量码';
COMMENT ON COLUMN dms_data_gzdy.current_b IS 'B相电流';
COMMENT ON COLUMN dms_data_gzdy.current_b_quality IS 'B相电流质量码';
COMMENT ON COLUMN dms_data_gzdy.current_c IS 'C相电流';
COMMENT ON COLUMN dms_data_gzdy.current_c_quality IS 'C相电流质量码';
COMMENT ON COLUMN dms_data_gzdy.voltage_uab IS 'Uab电压';
COMMENT ON COLUMN dms_data_gzdy.voltage_uab_quality IS 'Uab电压质量码';
COMMENT ON COLUMN dms_data_gzdy.voltage_ubc IS 'Ubc电压';
COMMENT ON COLUMN dms_data_gzdy.voltage_ubc_quality IS 'Ubc电压质量码';
COMMENT ON COLUMN dms_data_gzdy.voltage_uca IS 'Uca电压';
COMMENT ON COLUMN dms_data_gzdy.voltage_uca_quality IS 'Uca电压质量码';
COMMENT ON COLUMN dms_data_gzdy.created_at IS '数据入库时间';
import sys
import re
import psycopg2
import psycopg2.extras
from datetime import datetime
import time
# ==========================================
# 配置区域
# ==========================================
DB_CONFIG = {
"host": "127.0.0.1",
"port": 5432,
"user": "smart_dms_user",
"password": "***",
"database": "smart_dms_db"
}
TABLE_NAME = "dms_data_gzdy"
BATCH_SIZE = 500
MAX_RETRIES = 3
FIELD_MAPPING = {
'station_name': '厂站名',
'feeder_gis_id': '馈线GISID',
'switch_name': '开关名',
'switch_oid': '开关OID',
'switch_gis_id': '开关GISID',
'switch_status': '开关状态',
'switch_status_quality': '开关状态质量码',
'active_power': '有功',
'active_power_quality': '有功量测质量码',
'reactive_power': '无功',
'reactive_power_quality': '无功量测质量码',
'current_a': 'A相电流',
'current_a_quality': 'A相电流质量码',
'current_b': 'B相电流',
'current_b_quality': 'B相电流质量码',
'current_c': 'C相电流',
'current_c_quality': 'C相电流质量码',
'voltage_uab': 'Uab电压',
'voltage_uab_quality': 'Uab电压质量码',
'voltage_ubc': 'Ubc电压',
'voltage_ubc_quality': 'Ubc电压质量码',
'voltage_uca': 'Uca电压',
'voltage_uca_quality': 'Uca电压质量码'
}
# ==========================================
# 工具函数
# ==========================================
def get_db_connection():
try:
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = False
return conn
except psycopg2.Error as e:
log(f"FATAL: 数据库连接失败: {e}")
sys.exit(1)
def parse_value(value, data_type='str'):
if value is None: return None
value = value.strip()
if not value or value.lower() == 'null': return None
try:
if data_type == 'int':
val = int(float(value))
# PostgreSQL BIGINT 范围检查
if val > 9223372036854775807 or val < -9223372036854775808:
return None
return val
elif data_type == 'float':
return float(value)
else:
return value
except ValueError:
return None
def log(msg):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True)
def insert_batch_with_retry(conn, cursor, sql, data, raw_lines=None):
if not data: return
for attempt in range(MAX_RETRIES):
try:
# PostgreSQL使用execute_values进行批量插入,性能更好
psycopg2.extras.execute_values(
cursor, sql, data, template=None, page_size=BATCH_SIZE
)
conn.commit()
return
except psycopg2.OperationalError as e:
# PostgreSQL死锁等错误码
if 'deadlock' in str(e).lower() or 'lock' in str(e).lower():
log(f"WARNING: 锁冲突重试 ({attempt+1}/{MAX_RETRIES})...")
conn.rollback()
time.sleep(2)
continue
else:
raise e
except psycopg2.DataError as e:
# 遇到数据错误
log("ERROR: 批次包含脏数据,正在定位并剔除...")
conn.rollback()
insert_one_by_one(conn, cursor, sql, data, raw_lines)
return
except Exception as e:
log(f"ERROR: 未知写入错误: {e}")
conn.rollback()
raise e
raise Exception(f"重试 {MAX_RETRIES} 次后失败")
def insert_one_by_one(conn, cursor, sql_template, data, raw_lines):
"""逐条写入并打印出具体是哪一行数据坏了"""
success_count = 0
# 将批量插入SQL转换为单条插入SQL
single_sql = sql_template.replace('VALUES %s', 'VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)')
for i, row in enumerate(data):
try:
cursor.execute(single_sql, row)
conn.commit()
success_count += 1
except Exception as e:
# 打印出错的具体原因和原始数据内容,方便排查
log(f"DATA_ERROR: 第 {i+1} 条入库失败: {e}")
log(f" -> 问题数据: {row}")
if raw_lines and i < len(raw_lines):
log(f" -> 原始行内容: {raw_lines[i]}")
conn.rollback()
log(f"INFO: 逐条清洗完成,成功 {success_count}/{len(data)}")
# ==========================================
# 主逻辑
# ==========================================
def main():
input_stream = sys.stdin
record_time = None
header_map = {}
data_buffer = []
raw_line_buffer = [] # 缓存原始行,用于报错时回显
found_time = False
found_header = False
is_processing_data = False
conn = get_db_connection()
cursor = conn.cursor()
# PostgreSQL批量插入SQL(使用execute_values)
sql = f"""
INSERT INTO {TABLE_NAME} (
record_time, station_name, feeder_gis_id, switch_name, switch_oid, switch_gis_id,
switch_status, switch_status_quality,
active_power, active_power_quality, reactive_power, reactive_power_quality,
current_a, current_a_quality, current_b, current_b_quality, current_c, current_c_quality,
voltage_uab, voltage_uab_quality, voltage_ubc, voltage_ubc_quality, voltage_uca, voltage_uca_quality
) VALUES %s
"""
try:
for line in input_stream:
line = line.strip()
if not line: continue
if not found_time and line.startswith('<!'):
match = re.search(r"可读时间='([^']+)'", line)
if match:
record_time = match.group(1).replace('_', ' ')
found_time = True
log(f"INFO: 解析到记录时间: {record_time}")
continue
if not found_header and line.startswith('@'):
# 使用正则切分表头,兼容 \t 和 多个空格
headers = re.split(r'\s{2,}|\t', line[1:].strip())
for idx, col_name in enumerate(headers):
header_map[col_name.strip()] = idx
if '开关OID' in header_map:
found_header = True
is_processing_data = True
log(f"INFO: 表头解析成功,共 {len(headers)} 列 (智能分隔模式)")
else:
log("WARNING: 表头不符合预期")
continue
if is_processing_data:
if line.startswith('#'):
# 使用正则切分数据行
# 正则含义:切分 "制表符" 或者 "2个及以上的连续空格"
parts = re.split(r'\s{2,}|\t', line[1:].strip())
try:
def get_val(field_key, type_cvt):
e_col_name = FIELD_MAPPING[field_key]
idx = header_map.get(e_col_name)
if idx is not None and idx < len(parts):
return parse_value(parts[idx], type_cvt)
return None
# switch_oid不能为空(TimescaleDB压缩分组键要求)
switch_oid = get_val('switch_oid', 'str')
if not switch_oid:
continue # 跳过switch_oid为空的记录
row = (
record_time,
get_val('station_name', 'str'),
get_val('feeder_gis_id', 'str'),
get_val('switch_name', 'str'),
switch_oid,
get_val('switch_gis_id', 'str'),
get_val('switch_status', 'int'),
get_val('switch_status_quality', 'int'),
get_val('active_power', 'float'),
get_val('active_power_quality', 'int'),
get_val('reactive_power', 'float'),
get_val('reactive_power_quality', 'int'),
get_val('current_a', 'float'),
get_val('current_a_quality', 'int'),
get_val('current_b', 'float'),
get_val('current_b_quality', 'int'),
get_val('current_c', 'float'),
get_val('current_c_quality', 'int'),
get_val('voltage_uab', 'float'),
get_val('voltage_uab_quality', 'int'),
get_val('voltage_ubc', 'float'),
get_val('voltage_ubc_quality', 'int'),
get_val('voltage_uca', 'float'),
get_val('voltage_uca_quality', 'int'),
)
data_buffer.append(row)
raw_line_buffer.append(line) # 缓存原始行
if len(data_buffer) >= BATCH_SIZE:
insert_batch_with_retry(conn, cursor, sql, data_buffer, raw_line_buffer)
log(f"INFO: 成功写入 {len(data_buffer)} 条记录")
data_buffer = []
raw_line_buffer = []
except Exception as e:
log(f"WARNING: 数据行解析失败,跳过: {e}")
continue
else:
log(f"SUCCESS: 表读取结束 (遇到 '{line[0]}')")
break
if data_buffer:
insert_batch_with_retry(conn, cursor, sql, data_buffer, raw_line_buffer)
log(f"INFO: 剩余 {len(data_buffer)} 条入库完成")
except Exception as e:
log(f"FATAL: 处理中断: {e}")
conn.rollback()
sys.exit(1)
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()
# 安装PostgreSQL驱动
pip install psycopg2-binary
# 或者如果编译安装
sudo apt-get install libpq-dev
pip install psycopg2
请根据您的TimescaleDB实际配置修改脚本中的DB_CONFIG:
DB_CONFIG = {
"host": "your_timescaledb_host",
"port": 5432,
"user": "your_username",
"password": "your_password",
"database": "your_database_name"
}
表结构优化:
TIMESTAMPTZ作为时间列,自动处理时区record_time和switch_oid组成复合主键,满足TimescaleDB要求TimescaleDB特性:
脚本性能优化:
psycopg2.extras.execute_values进行高性能批量插入数据质量保障:
switch_oid不为空(压缩分组键要求)这个方案应该能显著提升查询性能,TimescaleDB在时序数据场景下通常比MySQL快10-100倍。