运行环境

ArcGIS10.2 + Python2.7

目录结构

├—— cad_to_shp.py       # 代码文件

├—— dwg                 # 输入目录(CAD目录)

│          └─ a.dwg

│          └─ b.dxf

├—— shp                 # 输出目录(shp目录)

│          └─ a.shp

│          └─ b.shp

代码

# coding=UTF-8
import arcpy
import os
import sys
from arcpy import env

env.workspace = u'e:\\cad'
env.overwriteOutput = True
cad_dir = u'E:\\cad\\dwg\\dwg_20250904\\dwg01'
shp_dir = u'E:\\cad\\shp\\shp_20250904\\shp01'
temp_gdb = "temp.gdb"
gdb_path = os.path.join(env.workspace, temp_gdb)
shp_list = []  # 用于存储所有待合并的shp文件路径
dwg_count = 0

if not os.path.exists(shp_dir):
    os.makedirs(shp_dir)   

def process_shapefile(shp_path, prj_shp_path, dwg_name):
    """处理投影转换和字段添加的通用函数"""
    try:
        # 定义投影
        arcpy.DefineProjection_management(shp_path, arcpy.SpatialReference(4528))
        # 投影转换
        arcpy.Project_management(
            shp_path,
            prj_shp_path,
            arcpy.SpatialReference(4528),
            transform_method=""
        )                
        # 新增字段并赋值
        arcpy.AddField_management(prj_shp_path, "XMMC", "TEXT", field_length=255)
        arcpy.AddField_management(prj_shp_path, "DKMC", "TEXT", field_length=255)
        arcpy.CalculateField_management(prj_shp_path, "XMMC", "'{}'".format(dwg_name), "PYTHON")
        arcpy.CalculateField_management(prj_shp_path, "DKMC", "'{}'".format(dwg_name), "PYTHON")
        return prj_shp_path
    except Exception as e:
        print("Error processing shapefile {}: {}".format(dwg_name, str(e)))
        # 清理失败的文件
        if arcpy.Exists(shp_path):
            arcpy.Delete_management(shp_path)
        if arcpy.Exists(prj_shp_path):
            arcpy.Delete_management(prj_shp_path)
        raise  # 重新抛出异常,让上层处理

for dwg_file in os.listdir(cad_dir):
    if dwg_file.endswith(('.dwg', '.dxf')):
        dwg_count += 1
        dwg_path = os.path.join(cad_dir, dwg_file)        
        dwg_name = os.path.splitext(dwg_file)[0]
        shp_name = dwg_name + ".shp"
        prj_shp_name = "prj_" + shp_name
        shp_path = os.path.join(shp_dir, shp_name)    
        prj_shp_path = os.path.join(shp_dir, prj_shp_name)
        # ===== 新增:检查目标SHP文件是否已存在 =====
        if os.path.exists(prj_shp_path):            
            shp_list.append(prj_shp_path)
            print "{}: SHP file {} already exists, skipping.".format(dwg_count,prj_shp_name)
            continue  # 跳过当前文件,处理下一个DWG
        
        
        converted = False
        temp_line_shp = None  # 初始化临时线文件路径

        try:
            # 清理旧临时GDB
            if arcpy.Exists(gdb_path):
                arcpy.Delete_management(gdb_path)
                if arcpy.Exists(gdb_path):
                    print "Warning: temp_gdb {} deleted failed".format(gdb_path)

            # 创建临时GDB和CAD要素数据集
            arcpy.CreateFileGDB_management(env.workspace, temp_gdb)
            cad_feature_dataset = os.path.join(gdb_path, "CAD_dataset")
            arcpy.CADToGeodatabase_conversion(dwg_path, gdb_path, "CAD_dataset", reference_scale=1000)

            # 1. 优先处理面状要素
            polygon_feature = os.path.join(cad_feature_dataset, "Polygon")
            if arcpy.Exists(polygon_feature) and int(arcpy.GetCount_management(polygon_feature).getOutput(0)) > 0:
                arcpy.FeatureClassToFeatureClass_conversion(polygon_feature, shp_dir, shp_name)
                converted = True

            # 2. 面状要素为空时处理线状要素
            if not converted:
                polyline_feature = os.path.join(cad_feature_dataset, "Polyline")
                if arcpy.Exists(polyline_feature) and int(arcpy.GetCount_management(polyline_feature).getOutput(0)) > 0:
                    temp_line_shp = os.path.join(shp_dir, dwg_name+"temp_line.shp")
                    arcpy.FeatureClassToFeatureClass_conversion(polyline_feature, shp_dir, os.path.basename(temp_line_shp).split('.')[0])
                    arcpy.FeatureToPolygon_management(temp_line_shp, shp_path)
                    if int(arcpy.GetCount_management(shp_path).getOutput(0)) > 0:
                        converted = True
                    else:
                        print "{}: Warning: Polyline to Polygon failed: {}".format(dwg_count,dwg_file)
                        arcpy.Delete_management(shp_path)

            # 3. 处理成功后执行投影和字段添加
            if converted:
                prj_shp_path = process_shapefile(shp_path, prj_shp_path, dwg_name)
                shp_list.append(prj_shp_path)
                print "{}: Convert successfully: {}".format(dwg_count,dwg_file)
            else:
                print "{}: Warning: No valid elements: {}".format(dwg_count,dwg_file)

        except Exception as e:
            print "{}: Warning: Processing failed: {}, {}".format(dwg_count,dwg_file,str(e))
        finally:
            # 强制清理临时文件(无论成功失败)
            if temp_line_shp and arcpy.Exists(temp_line_shp):
                arcpy.Delete_management(temp_line_shp)
            if arcpy.Exists(shp_path) and not converted:
                arcpy.Delete_management(shp_path)
            if arcpy.Exists(prj_shp_path) and not converted:
                arcpy.Delete_management(prj_shp_path)
            # 强制删除临时GDB
            if arcpy.Exists(gdb_path):
                try:
                    arcpy.Delete_management(gdb_path)
                except:
                    print "Warning: temp.gdb{} deleted failed".format(gdb_path)

# 合并所有有效SHP文件
if shp_list:
    merged_shp = os.path.join(shp_dir, "merge.shp")
    try:
        arcpy.Merge_management(shp_list, merged_shp)
        print "Merge successfully {} files to: {}".format(len(shp_list),merged_shp)
        print "{} files converted failed. ".format(dwg_count-len(shp_list))
    except Exception as e:
        print "Warning: Merge failed: {}".format(str(e))
else:
    print("Warning: No valid files to be merged!")