开篇:一个真实的数据清洗挑战
2024年1月,某新能源汽车品牌的运营总监王经理遇到了一个头疼的问题:
集团要求各区域提交Q4季度的门店运营数据分析报告。她手上有:
- 50家门店的3个月工单数据(150个Excel文件)
- 数据格式混乱,每个门店导出的格式都不一样
- 有大量重复记录、缺失值、错误数据
- 领导要求3天内完成分析
她的团队用传统Excel方式,预计需要2周时间。
后来,她找到了我。我用Pandas写了一个自动化脚本,2小时完成了所有数据清洗和分析。
今天,我将把这个完整的案例分享给你,让你看看Pandas在实际工作中的威力。
案例背景与数据情况
业务背景
某新能源汽车品牌,全国有50家门店,需要分析Q4季度(10-12月)的运营数据:
- 分析各门店的NPS表现
- 统计营收和客单价
- 识别优秀门店和需要改进的门店
- 生成战区对比报告
数据来源
每家门店每月提交一个Excel文件,包含:
- 工单编号
- 日期
- 客户姓名
- 车型
- 服务类型(保养/维修)
- 工时费
- 配件费
- 客户评分(1-5分)
数据问题
实际收到的150个文件存在以下问题:
问题1:格式不统一
- 有的文件列名是中文,有的是英文
- 有的日期格式是 2024-10-15,有的是 2024/10/15,还有的是 20241015
- 有的金额带逗号,有的不带
问题2:数据质量差
- 有重复工单(同一个工单出现多次)
- 有缺失值(客户姓名、车型、评分等)
- 有异常值(工时费为负数、评分超过5分等)
问题3:需要计算
- 需要计算每个工单的总金额
- 需要将1-5分评分转换为NPS(净推荐值)
- 需要按门店、月份、战区等多维度统计
第一步:批量读取数据
1. 导入必要的库
import pandas as pd
import numpy as np
import glob
import os
from datetime import datetime
import warnings
warnings.filterwarnings('ignore') # 忽略警告信息
2. 读取所有Excel文件
# 设置数据文件夹路径
data_folder = '门店数据/Q4'
# 获取所有xlsx文件
file_list = glob.glob(f'{data_folder}/*.xlsx')
print(f'找到 {len(file_list)} 个数据文件')
print('\n前5个文件:')
for f in file_list[:5]:
print(f' - {os.path.basename(f)}')
输出示例:
找到 150 个数据文件
前5个文件:
- 北京朝阳店_2024年10月.xlsx
- 北京朝阳店_2024年11月.xlsx
- 北京朝阳店_2024年12月.xlsx
- 上海浦东店_2024年10月.xlsx
- 上海浦东店_2024年11月.xlsx
3. 定义统一的列名映射
由于不同文件的列名不统一,我们需要先统一列名:
# 定义列名映射字典
column_mapping = {
# 中文列名
'工单编号': 'order_id',
'日期': 'date',
'客户姓名': 'customer_name',
'车型': 'car_model',
'服务类型': 'service_type',
'工时费': 'labor_fee',
'配件费': 'parts_fee',
'客户评分': 'rating',
# 英文列名(有些门店用英文)
'Order ID': 'order_id',
'Date': 'date',
'Customer': 'customer_name',
'Model': 'car_model',
'Service Type': 'service_type',
'Labor Fee': 'labor_fee',
'Parts Fee': 'parts_fee',
'Rating': 'rating'
}
4. 批量读取并合并
def read_store_data(file_path):
"""
读取单个门店数据文件
"""
try:
# 读取Excel文件
df = [pd.read](http://pd.read)_excel(file_path)
# 从文件名提取门店名称和月份
filename = os.path.basename(file_path)
# 假设文件名格式:门店名_2024年10月.xlsx
parts = filename.replace('.xlsx', '').split('_')
store_name = parts[0]
month = parts[1] if len(parts) > 1 else '未知'
# 添加门店和月份信息
df['门店'] = store_name
df['月份'] = month
df['文件名'] = filename
# 统一列名
df = df.rename(columns=column_mapping)
return df
except Exception as e:
print(f'读取文件失败: {file_path}')
print(f'错误信息: {str(e)}')
return None
# 读取所有文件
df_list = []
for i, file_path in enumerate(file_list, 1):
print(f'\r正在读取第 {i}/{len(file_list)} 个文件...', end='')
df_temp = read_store_data(file_path)
if df_temp is not None:
df_list.append(df_temp)
print('\n读取完成!')
# 合并所有数据
df_raw = pd.concat(df_list, ignore_index=True)
print(f'\n合并后的数据:')
print(f' 总行数: {len(df_raw):,}')
print(f' 总列数: {len(df_raw.columns)}')
print(f' 涉及门店数: {df_raw["门店"].nunique()}')
输出示例:
正在读取第 150/150 个文件...
读取完成!
合并后的数据:
总行数: 125,847
总列数: 11
涉及门店数: 50
第二步:数据探索与质量检查
1. 查看数据基本信息
# 查看前几行
print('数据预览:')
print(df_raw.head())
# 查看数据类型
print('\n数据类型:')
print(df_raw.dtypes)
# 查看基本统计信息
print('\n数值列统计:')
print(df_raw.describe())
2. 检查缺失值
# 统计缺失值
missing = df_raw.isnull().sum()
missing_pct = (missing / len(df_raw)) * 100
missing_summary = pd.DataFrame({
'缺失数量': missing,
'缺失比例': missing_pct.round(2)
})
print('\n缺失值统计:')
print(missing_summary[missing_summary['缺失数量'] > 0])
输出示例:
缺失值统计:
缺失数量 缺失比例
customer_name 1250 0.99%
car_model 2340 1.86%
rating 8760 6.96%
3. 检查重复值
# 检查重复工单
duplicate_orders = df_raw.duplicated(subset=['order_id']).sum()
print(f'\n重复工单数: {duplicate_orders:,} ({duplicate_orders/len(df_raw)*100:.2f}%)')
# 查看重复工单的详细信息
if duplicate_orders > 0:
duplicates = df_raw[df_raw.duplicated(subset=['order_id'], keep=False)]
print('\n重复工单示例:')
print(duplicates[['order_id', '门店', 'date', 'labor_fee']].head(10))
4. 检查数据异常
# 检查金额异常
print('\n工时费异常检查:')
print(f' 负值数量: {(df_raw["labor_fee"] < 0).sum()}')
print(f' 超大值(>10000)数量: {(df_raw["labor_fee"] > 10000).sum()}')
print(f' 为0数量: {(df_raw["labor_fee"] == 0).sum()}')
print('\n配件费异常检查:')
print(f' 负值数量: {(df_raw["parts_fee"] < 0).sum()}')
print(f' 超大值(>50000)数量: {(df_raw["parts_fee"] > 50000).sum()}')
# 检查评分异常
print('\n评分异常检查:')
print(f' 小于1分: {(df_raw["rating"] < 1).sum()}')
print(f' 大于5分: {(df_raw["rating"] > 5).sum()}')
print(f' 评分分布: ')
print(df_raw['rating'].value_counts().sort_index())
输出示例:
工时费异常检查:
负值数量: 23
超大值(>10000)数量: 5
为0数量: 1250
配件费异常检查:
负值数量: 15
超大值(>50000)数量: 2
评分异常检查:
小于1分: 45
大于5分: 12
评分分布:
1.0 5240
2.0 8630
3.0 28450
4.0 52380
5.0 22387
第三步:数据清洗
1. 处理日期格式
def clean_date(date_str):
"""
统一日期格式
"""
if pd.isna(date_str):
return None
# 尝试多种日期格式
date_formats = [
'%Y-%m-%d',
'%Y/%m/%d',
'%Y%m%d',
'%d/%m/%Y',
'%d-%m-%Y'
]
for fmt in date_formats:
try:
return [pd.to](http://pd.to)_datetime(date_str, format=fmt)
except:
continue
# 如果都失败,用pandas的智能解析
try:
return [pd.to](http://pd.to)_datetime(date_str)
except:
return None
# 应用日期清洗
print('正在清洗日期数据...')
df_raw['date_clean'] = df_raw['date'].apply(clean_date)
# 检查清洗结果
date_null = df_raw['date_clean'].isnull().sum()
print(f'无法解析的日期数量: {date_null}')
if date_null > 0:
print('\n无法解析的日期示例:')
print(df_raw[df_raw['date_clean'].isnull()]['date'].head())
2. 删除重复工单
print('\n删除重复工单...')
print(f'删除前: {len(df_raw):,} 行')
# 保留第一次出现的工单
df_clean = df_raw.drop_duplicates(subset=['order_id'], keep='first')
print(f'删除后: {len(df_clean):,} 行')
print(f'删除了: {len(df_raw) - len(df_clean):,} 行重复数据')
3. 处理缺失值
print('\n处理缺失值...')
# 对于客户姓名和车型的缺失值,用"未知"填充
df_clean['customer_name'] = df_clean['customer_name'].fillna('未知客户')
df_clean['car_model'] = df_clean['car_model'].fillna('未知车型')
# 对于金额的缺失值,用0填充
df_clean['labor_fee'] = df_clean['labor_fee'].fillna(0)
df_clean['parts_fee'] = df_clean['parts_fee'].fillna(0)
# 对于评分的缺失值,暂时不处理(后续分析时排除)
print(f'评分缺失值: {df_clean["rating"].isnull().sum()}')
4. 处理异常值
print('\n处理异常值...')
# 修正负值金额(可能是录入错误,取绝对值)
df_clean['labor_fee'] = df_clean['labor_fee'].abs()
df_clean['parts_fee'] = df_clean['parts_fee'].abs()
# 删除超大金额异常值(可能是录入错误)
print(f'删除前: {len(df_clean):,} 行')
df_clean = df_clean[
(df_clean['labor_fee'] <= 10000) &
(df_clean['parts_fee'] <= 50000)
]
print(f'删除后: {len(df_clean):,} 行')
# 修正评分异常值
df_clean.loc[df_clean['rating'] < 1, 'rating'] = 1
df_clean.loc[df_clean['rating'] > 5, 'rating'] = 5
print('\n异常值处理完成!')
5. 数据类型转换
print('\n转换数据类型...')
# 确保数值类型正确
df_clean['labor_fee'] = [pd.to](http://pd.to)_numeric(df_clean['labor_fee'], errors='coerce')
df_clean['parts_fee'] = [pd.to](http://pd.to)_numeric(df_clean['parts_fee'], errors='coerce')
df_clean['rating'] = [pd.to](http://pd.to)_numeric(df_clean['rating'], errors='coerce')
# 确保日期类型
df_clean['date_clean'] = [pd.to](http://pd.to)_datetime(df_clean['date_clean'], errors='coerce')
print('数据类型转换完成!')
print('\n最终数据类型:')
print(df_clean.dtypes)
第四步:特征工程
1. 计算总金额
# 计算每个工单的总金额
df_clean['total_amount'] = df_clean['labor_fee'] + df_clean['parts_fee']
print('总金额统计:')
print(df_clean['total_amount'].describe())
2. 提取时间特征
# 从日期中提取年、月、日、星期等
df_clean['年'] = df_clean['date_clean'].dt.year
df_clean['月'] = df_clean['date_clean'].dt.month
df_clean['日'] = df_clean['date_clean'].[dt.day](http://dt.day)
df_clean['星期'] = df_clean['date_clean'].dt.dayofweek # 0=周一, 6=周日
df_clean['是否周末'] = df_clean['星期'].isin([5, 6])
print('\n时间特征示例:')
print(df_clean[['date_clean', '年', '月', '日', '星期', '是否周末']].head())
3. 将评分转换为NPS分类
def rating_to_nps_category(rating):
"""
将1-5分评分转换为NPS分类
5分 = 推荐者
4分 = 中立者
1-3分 = 贬损者
"""
if pd.isna(rating):
return None
elif rating == 5:
return '推荐者'
elif rating == 4:
return '中立者'
else:
return '贬损者'
df_clean['NPS分类'] = df_clean['rating'].apply(rating_to_nps_category)
print('\nNPS分类分布:')
print(df_clean['NPS分类'].value_counts())
4. 添加门店战区信息
# 定义门店与战区的映射关系
store_to_region = {
'北京朝阳店': '华北战区',
'天津和平店': '华北战区',
'上海浦东店': '华东战区',
'杭州西湖店': '华东战区',
'深圳南山店': '华南战区',
'广州天河店': '华南战区',
# ... 其他门店映射
}
# 应用映射
df_clean['战区'] = df_clean['门店'].map(store_to_region)
# 检查未映射的门店
unmapped = df_clean[df_clean['战区'].isnull()]['门店'].unique()
if len(unmapped) > 0:
print(f'\n警告: 有 {len(unmapped)} 个门店未映射到战区:')
print(unmapped[:10])
第五步:数据验证与保存
1. 最终数据质量检查
print('\n' + '='*60)
print('最终数据质量报告')
print('='*60)
print(f'\n总数据量: {len(df_clean):,} 行')
print(f'涉及门店: {df_clean["门店"].nunique()} 家')
print(f'时间范围: {df_clean["date_clean"].min()} 至 {df_clean["date_clean"].max()}')
print('\n缺失值情况:')
missing_final = df_clean.isnull().sum()
print(missing_final[missing_final > 0])
print('\n数值统计:')
print(df_clean[['labor_fee', 'parts_fee', 'total_amount', 'rating']].describe())
print('\n数据清洗完成!✓')
2. 保存清洗后的数据
# 保存为Excel
output_file = f'门店数据_清洗后_{[datetime.now](http://datetime.now)().strftime("%Y%m%d")}.xlsx'
df_[clean.to](http://clean.to)_excel(output_file, index=False)
print(f'\n数据已保存至: {output_file}')
# 也可以保存为CSV(速度更快,文件更小)
csv_file = output_file.replace('.xlsx', '.csv')
df_[clean.to](http://clean.to)_csv(csv_file, index=False, encoding='utf-8-sig')
print(f'CSV文件已保存至: {csv_file}')
关键收获
通过这个实战案例,你学会了:
✅ 批量读取:使用glob一次性读取多个文件
✅ 列名统一:处理不同格式的列名映射
✅ 日期处理:统一多种日期格式
✅ 缺失值处理:根据业务逻辑选择填充策略
✅ 重复值处理:识别并删除重复数据
✅ 异常值处理:发现并修正数据异常
✅ 特征工程:从原始数据提取有价值的特征
✅ 数据验证:全流程质量检查
在下一节(Day 38-6),我们将基于清洗后的数据,进行多维度分析和洞察提取。
你准备好了吗?让我们继续! ?