运行环境
ArcGIS10.2
+ Python2.7
目录结构
├—— cad_to_shp.py # 代码文件
├—— dwg # 输入目录(CAD目录)
│ └─ a.dwg
│ └─ b.dxf
├—— shp # 输出目录(shp目录)
│ └─ a.shp
│ └─ b.shp
代码
# coding=UTF-8
import arcpy
import os
import sys
from arcpy import env
env.workspace = u'e:\\cad'
env.overwriteOutput = True
cad_dir = u'E:\\cad\\dwg\\dwg_20250904\\dwg01'
shp_dir = u'E:\\cad\\shp\\shp_20250904\\shp01'
temp_gdb = "temp.gdb"
gdb_path = os.path.join(env.workspace, temp_gdb)
shp_list = [] # 用于存储所有待合并的shp文件路径
dwg_count = 0
if not os.path.exists(shp_dir):
os.makedirs(shp_dir)
def process_shapefile(shp_path, prj_shp_path, dwg_name):
"""处理投影转换和字段添加的通用函数"""
try:
# 定义投影
arcpy.DefineProjection_management(shp_path, arcpy.SpatialReference(4528))
# 投影转换
arcpy.Project_management(
shp_path,
prj_shp_path,
arcpy.SpatialReference(4528),
transform_method=""
)
# 新增字段并赋值
arcpy.AddField_management(prj_shp_path, "XMMC", "TEXT", field_length=255)
arcpy.AddField_management(prj_shp_path, "DKMC", "TEXT", field_length=255)
arcpy.CalculateField_management(prj_shp_path, "XMMC", "'{}'".format(dwg_name), "PYTHON")
arcpy.CalculateField_management(prj_shp_path, "DKMC", "'{}'".format(dwg_name), "PYTHON")
return prj_shp_path
except Exception as e:
print("Error processing shapefile {}: {}".format(dwg_name, str(e)))
# 清理失败的文件
if arcpy.Exists(shp_path):
arcpy.Delete_management(shp_path)
if arcpy.Exists(prj_shp_path):
arcpy.Delete_management(prj_shp_path)
raise # 重新抛出异常,让上层处理
for dwg_file in os.listdir(cad_dir):
if dwg_file.endswith(('.dwg', '.dxf')):
dwg_count += 1
dwg_path = os.path.join(cad_dir, dwg_file)
dwg_name = os.path.splitext(dwg_file)[0]
shp_name = dwg_name + ".shp"
prj_shp_name = "prj_" + shp_name
shp_path = os.path.join(shp_dir, shp_name)
prj_shp_path = os.path.join(shp_dir, prj_shp_name)
# ===== 新增:检查目标SHP文件是否已存在 =====
if os.path.exists(prj_shp_path):
shp_list.append(prj_shp_path)
print "{}: SHP file {} already exists, skipping.".format(dwg_count,prj_shp_name)
continue # 跳过当前文件,处理下一个DWG
converted = False
temp_line_shp = None # 初始化临时线文件路径
try:
# 清理旧临时GDB
if arcpy.Exists(gdb_path):
arcpy.Delete_management(gdb_path)
if arcpy.Exists(gdb_path):
print "Warning: temp_gdb {} deleted failed".format(gdb_path)
# 创建临时GDB和CAD要素数据集
arcpy.CreateFileGDB_management(env.workspace, temp_gdb)
cad_feature_dataset = os.path.join(gdb_path, "CAD_dataset")
arcpy.CADToGeodatabase_conversion(dwg_path, gdb_path, "CAD_dataset", reference_scale=1000)
# 1. 优先处理面状要素
polygon_feature = os.path.join(cad_feature_dataset, "Polygon")
if arcpy.Exists(polygon_feature) and int(arcpy.GetCount_management(polygon_feature).getOutput(0)) > 0:
arcpy.FeatureClassToFeatureClass_conversion(polygon_feature, shp_dir, shp_name)
converted = True
# 2. 面状要素为空时处理线状要素
if not converted:
polyline_feature = os.path.join(cad_feature_dataset, "Polyline")
if arcpy.Exists(polyline_feature) and int(arcpy.GetCount_management(polyline_feature).getOutput(0)) > 0:
temp_line_shp = os.path.join(shp_dir, dwg_name+"temp_line.shp")
arcpy.FeatureClassToFeatureClass_conversion(polyline_feature, shp_dir, os.path.basename(temp_line_shp).split('.')[0])
arcpy.FeatureToPolygon_management(temp_line_shp, shp_path)
if int(arcpy.GetCount_management(shp_path).getOutput(0)) > 0:
converted = True
else:
print "{}: Warning: Polyline to Polygon failed: {}".format(dwg_count,dwg_file)
arcpy.Delete_management(shp_path)
# 3. 处理成功后执行投影和字段添加
if converted:
prj_shp_path = process_shapefile(shp_path, prj_shp_path, dwg_name)
shp_list.append(prj_shp_path)
print "{}: Convert successfully: {}".format(dwg_count,dwg_file)
else:
print "{}: Warning: No valid elements: {}".format(dwg_count,dwg_file)
except Exception as e:
print "{}: Warning: Processing failed: {}, {}".format(dwg_count,dwg_file,str(e))
finally:
# 强制清理临时文件(无论成功失败)
if temp_line_shp and arcpy.Exists(temp_line_shp):
arcpy.Delete_management(temp_line_shp)
if arcpy.Exists(shp_path) and not converted:
arcpy.Delete_management(shp_path)
if arcpy.Exists(prj_shp_path) and not converted:
arcpy.Delete_management(prj_shp_path)
# 强制删除临时GDB
if arcpy.Exists(gdb_path):
try:
arcpy.Delete_management(gdb_path)
except:
print "Warning: temp.gdb{} deleted failed".format(gdb_path)
# 合并所有有效SHP文件
if shp_list:
merged_shp = os.path.join(shp_dir, "merge.shp")
try:
arcpy.Merge_management(shp_list, merged_shp)
print "Merge successfully {} files to: {}".format(len(shp_list),merged_shp)
print "{} files converted failed. ".format(dwg_count-len(shp_list))
except Exception as e:
print "Warning: Merge failed: {}".format(str(e))
else:
print("Warning: No valid files to be merged!")