背景

需要将1000多个界址点txt文件转换成shp文件,具体需求如下:

  • txt目录下有多个子目录,需要检索所有子目录中的txt文件,并将shp输出到相应的shp子目录下。

  • txt文件编码不统一,有的是UTF-8,有的是GB2312 ,因此读取txt文件时,需要先判断其编码。

  • 部分txt文件可能损坏,或者存在乱码问题,需要给出提示错误,重新补充界址点文件。

  • 转换结束后,需要输出详细的日志文件,并保存在工作空间根目录。

运行环境

ArcGIS + Python 2.7

目录结构

├—— txt_to_shp.py       # 代码文件
├—— txt                 # 输入目录(界址点目录)
│    └─ txt01     # txt子目录
│          └─ 0001.txt # 界址点文件
│          └─ 0002.txt # 界址点文件
│    └─ txt02     # txt子目录
│          └─ 0101.txt # 界址点文件
│          └─ 0102.txt # 界址点文件
├—— shp                 # 输出目录(shp目录)
│    └─ shp01     # shp子目录
│         └─ 0001.shp # shp文件
│         └─ 0002.shp # shp文件
│    └─ shp02     # shp子目录
│         └─ 0101.shp # shp文件
│         └─ 0102.shp # shp文件

代码

# coding=UTF-8
import arcpy
import os
import sys
import codecs
import re  # 用于正则匹配非法符号
import io
import chardet
import sys
import datetime

# 解决Python 2.7默认编码限制
reload(sys)
sys.setdefaultencoding('utf-8')

# 获取系统文件编码
fs_encoding = sys.getfilesystemencoding()

# 配置路径(支持中文)
rootdir = u'E:\\jiezhidian' #工作空间
txt_dir = os.path.join(rootdir, u'txt')
shp_dir = os.path.join(rootdir, u'shp')
dir_num=0
logging_list=[] #日志文件列表
# 日志文件内容,用于打印输出,同时用于追加到logging_list中。基本格式:当前时间+打印内容
logging_text="{}:  .Start converting……\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) 
logging_list.append(logging_text)
logging_filepath=os.path.join(rootdir,u'logging.txt') # 日志文件保存在工作空间根目录下

# 检测txt文件编码
def detect_encoding(file_path):
    """检测文件编码并返回编码类型和置信度"""
    try:
        with open(file_path, 'rb') as f:
            raw_data = f.read()
            result = chardet.detect(raw_data)
        return result['encoding'], result['confidence']
    except IOError as e:
        # 错误信息处理:将字节串异常消息转为 Unicode
        error_msg = u"file open fail: %s" % unicode(str(e), 'utf-8', 'ignore')        
        print error_msg.encode(sys.stdout.encoding or 'gbk', 'replace')
        return None, None

# 检查并创建shp目录
if not os.path.exists(shp_dir):
    os.makedirs(shp_dir)

# 设置ArcPy环境
arcpy.env.workspace = shp_dir
arcpy.env.overwriteOutput = True

# 定义空间参考
spatial_ref = arcpy.SpatialReference(4528)

# ArcGIS禁止的特殊符号(\ / : * ? " < > |)
invalid_pattern = re.compile(r'[\\/:\*\?"<>\|]')

# 遍历txt目录下的所有文件
for root,dirs,files in os.walk(txt_dir):
    for sub_dir in dirs:
        shp_list=[] # 储存shp文件路径,用于该shp子目录下的merge操作
        file_num=0  # 子目录下的txt文件数量
        dir_num=dir_num+1  # 子目录数量      
        sub_txtdir=os.path.join(root,sub_dir) # txt子目录
        # shp子目录名称
        sub_shpdir=os.path.join(shp_dir,"shp"+sub_dir[-2:])
        print "\033[32m\n\n|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|\n\033[0m"
        logging_text="{}:   {}:Start processing the subdirectory:{}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),dir_num,sub_txtdir)
        logging_list.append(logging_text)
        print "\033[1;34m{}\033[0m".format(logging_text)
        logging_text="{}:   SHP file will be converted to directory:{}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),sub_shpdir)
        print "\033[1;34m{}\033[0m".format(logging_text)
        logging_list.append(logging_text)
        print "\033[32m\n|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|\n\n\033[0m"
        # 检查并创建shp子目录
        if not os.path.exists(sub_shpdir):
            os.makedirs(sub_shpdir)

        for txt_file in os.listdir(sub_txtdir):
            if txt_file.endswith('.txt'):
                file_num=file_num+1
                txt_path = os.path.join(sub_txtdir, txt_file)
                logging_text="{}:   Start processing the txt file:{}\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),txt_path)
                print "{}".format(logging_text)
                logging_list.append(logging_text)
                # 生成输出 shp 文件名(与 txt 同名)                
                shp_name = os.path.splitext(txt_file)[0] + ".shp"
                shp_path = os.path.join(sub_shpdir, shp_name)
                logging_text="{}:   {} will be converted to :{}\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),txt_path,shp_path)
                print "{}".format(logging_text)
                logging_list.append(logging_text)
                # 检查shp是否存在
                if os.path.exists(shp_path):
                    shp_list.append(shp_path)
                    logging_text="{}:   {}: SHP file {} already exists, skipping.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num,shp_name)
                    print "{}".format(logging_text)
                    logging_list.append(logging_text)
                    continue
                converted=False # 初始化转换状态,初始状态“失败”
                try:
                    # 创建面要素类
                    arcpy.CreateFeatureclass_management(
                        sub_shpdir,
                        shp_name,
                        "POLYGON",
                        spatial_reference=spatial_ref
                    )

                    # 读取txt文件内容(兼容编码)
                    # points = []
                    result=detect_encoding(txt_path)[0]
                    #print "{} coding:{}".format(txt_path,result)
                    try:                   
                        with io.open(txt_path, 'r',encoding=result) as f:
                            content = f.read()
                            # print "{}".format(content)
                    except UnicodeDecodeError:
                        with codecs.open(txt_path, 'r', fs_encoding) as f:
                            content = f.read()
                    
                    try:
                        # 按 "@" 分割不同面要素(注意去除空行)
                        features = [f.strip() for f in content.split("@") if f.strip()]
                        with arcpy.da.InsertCursor(shp_path,["SHAPE@"]) as cursor:
                            for feature in features[1:]:
                                #print(feature)
                                points=[]
                                for line in feature.split('\n'):
                                    line=line.strip()
                                    if line.startswith('J'):
                                        
                                        parts=line.split(',')
                                        if len(parts)>=4:
                                            x = float(parts[2])
                                            #print(x)
                                            y = float(parts[3])
                                            #print(y)
                                            points.append(arcpy.Point(y,x))
                                            
                                # 构建并插入多边形            
                                polygon = arcpy.Polygon(arcpy.Array(points), spatial_ref)           
                                with arcpy.da.InsertCursor(shp_path, ["SHAPE@"]) as cursor:
                                    cursor.insertRow([polygon])
                            converted=True
                    except Exception as e:
                        logging_text="{}:   {}: Warning: Processing failed: {}, {}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num,txt_file,str(e))
                        print "\033[1;31m{}\033[0m".format(logging_text)
                        logging_list.append(logging_text)
                    if converted:
                        arcpy.AddField_management(shp_path, "dzjgh", "TEXT", field_length=255)
                        arcpy.CalculateField_management(shp_path, "dzjgh", "'{}'".format(os.path.splitext(txt_file)[0]), "PYTHON")
                        shp_list.append(shp_path)
                        logging_text="{}:   {}.Convert successfully: {}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num,shp_name)
                        print "\033[1;32m{}\033[0m".format(logging_text)
                        logging_list.append(logging_text)
                except  Exception as e:
                    logging_text="{}:   {}: Warning: Processing failed: {},{}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num,txt_file,str(e))
                    print "\033[1;31m{}\033[0m".format(logging_text)
                    logging_list.append(logging_text)
                
        if shp_list:
            merged_shp=os.path.join(sub_shpdir,"merge.shp")
            try:
                arcpy.Merge_management(shp_list, merged_shp)
                print "\033[32m\n\n|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|\n\033[0m"
                logging_text="{}:   Subdirectory: {} merge successfully {} files to: {}.\n{}:   . {} files converted failed.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),sub_shpdir,len(shp_list),merged_shp,datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num-len(shp_list))                
                print "\033[1;32m{}\033[0m".format(logging_text)
                logging_list.append(logging_text)
                print "\033[32m\n|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|\n\n\033[0m"
            except Exception as e:
                logging_text="{}:   Warning: Merge failed: {}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),str(e))
                print "\033[1;31m{}\033[0m".format(logging_text)
                logging_list.append(logging_text)
        else: 
            logging_text="{}:   Warning: No valid files to be merged!\n"
            print "\033[1;31m{}\033[0m".format(logging_text)
            logging_list.append(logging_text)

#将日志列表写入根目录文件
try:
    try:
        if os.path.exists(logging_filepath):
            os.remove(logging_filepath)
    except Exception as e:
        logging_text="{}:   Warning: Delete {} failed. {}./n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),logging_filepath,str(e))
        print "\033[1;31m{}\033[0m".format(logging_text)

    try:
        with open(logging_filepath,"w") as f:
            for item in logging_list:
                content="{}\n".format(str(item))
                f.write(content)
    except Exception as e:
        logging_text="{}:   Warning: Create {} failed. {}./n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),logging_filepath,str(e))
        print "\033[1;31m{}\033[0m".format(logging_text)
except Exception as e:
    logging_text="{}:   Warning: Create {} failed. {}./n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),logging_filepath,str(e))
    print "\033[1;31m{}\033[0m".format(logging_text)