背景
需要将1000多个界址点txt文件转换成shp文件,具体需求如下:
txt目录下有多个子目录,需要检索所有子目录中的txt文件,并将shp输出到相应的shp子目录下。
txt文件编码不统一,有的是
UTF-8
,有的是GB2312
,因此读取txt文件时,需要先判断其编码。部分txt文件可能损坏,或者存在乱码问题,需要给出提示错误,重新补充界址点文件。
转换结束后,需要输出详细的日志文件,并保存在工作空间根目录。
运行环境
ArcGIS
+ Python 2.7
目录结构
├—— txt_to_shp.py # 代码文件
├—— txt # 输入目录(界址点目录)
│ └─ txt01 # txt子目录
│ └─ 0001.txt # 界址点文件
│ └─ 0002.txt # 界址点文件
│ └─ txt02 # txt子目录
│ └─ 0101.txt # 界址点文件
│ └─ 0102.txt # 界址点文件
├—— shp # 输出目录(shp目录)
│ └─ shp01 # shp子目录
│ └─ 0001.shp # shp文件
│ └─ 0002.shp # shp文件
│ └─ shp02 # shp子目录
│ └─ 0101.shp # shp文件
│ └─ 0102.shp # shp文件
代码
# coding=UTF-8
import arcpy
import os
import sys
import codecs
import re # 用于正则匹配非法符号
import io
import chardet
import sys
import datetime
# 解决Python 2.7默认编码限制
reload(sys)
sys.setdefaultencoding('utf-8')
# 获取系统文件编码
fs_encoding = sys.getfilesystemencoding()
# 配置路径(支持中文)
rootdir = u'E:\\jiezhidian' #工作空间
txt_dir = os.path.join(rootdir, u'txt')
shp_dir = os.path.join(rootdir, u'shp')
dir_num=0
logging_list=[] #日志文件列表
# 日志文件内容,用于打印输出,同时用于追加到logging_list中。基本格式:当前时间+打印内容
logging_text="{}: .Start converting……\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
logging_list.append(logging_text)
logging_filepath=os.path.join(rootdir,u'logging.txt') # 日志文件保存在工作空间根目录下
# 检测txt文件编码
def detect_encoding(file_path):
"""检测文件编码并返回编码类型和置信度"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
result = chardet.detect(raw_data)
return result['encoding'], result['confidence']
except IOError as e:
# 错误信息处理:将字节串异常消息转为 Unicode
error_msg = u"file open fail: %s" % unicode(str(e), 'utf-8', 'ignore')
print error_msg.encode(sys.stdout.encoding or 'gbk', 'replace')
return None, None
# 检查并创建shp目录
if not os.path.exists(shp_dir):
os.makedirs(shp_dir)
# 设置ArcPy环境
arcpy.env.workspace = shp_dir
arcpy.env.overwriteOutput = True
# 定义空间参考
spatial_ref = arcpy.SpatialReference(4528)
# ArcGIS禁止的特殊符号(\ / : * ? " < > |)
invalid_pattern = re.compile(r'[\\/:\*\?"<>\|]')
# 遍历txt目录下的所有文件
for root,dirs,files in os.walk(txt_dir):
for sub_dir in dirs:
shp_list=[] # 储存shp文件路径,用于该shp子目录下的merge操作
file_num=0 # 子目录下的txt文件数量
dir_num=dir_num+1 # 子目录数量
sub_txtdir=os.path.join(root,sub_dir) # txt子目录
# shp子目录名称
sub_shpdir=os.path.join(shp_dir,"shp"+sub_dir[-2:])
print "\033[32m\n\n|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|\n\033[0m"
logging_text="{}: {}:Start processing the subdirectory:{}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),dir_num,sub_txtdir)
logging_list.append(logging_text)
print "\033[1;34m{}\033[0m".format(logging_text)
logging_text="{}: SHP file will be converted to directory:{}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),sub_shpdir)
print "\033[1;34m{}\033[0m".format(logging_text)
logging_list.append(logging_text)
print "\033[32m\n|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|\n\n\033[0m"
# 检查并创建shp子目录
if not os.path.exists(sub_shpdir):
os.makedirs(sub_shpdir)
for txt_file in os.listdir(sub_txtdir):
if txt_file.endswith('.txt'):
file_num=file_num+1
txt_path = os.path.join(sub_txtdir, txt_file)
logging_text="{}: Start processing the txt file:{}\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),txt_path)
print "{}".format(logging_text)
logging_list.append(logging_text)
# 生成输出 shp 文件名(与 txt 同名)
shp_name = os.path.splitext(txt_file)[0] + ".shp"
shp_path = os.path.join(sub_shpdir, shp_name)
logging_text="{}: {} will be converted to :{}\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),txt_path,shp_path)
print "{}".format(logging_text)
logging_list.append(logging_text)
# 检查shp是否存在
if os.path.exists(shp_path):
shp_list.append(shp_path)
logging_text="{}: {}: SHP file {} already exists, skipping.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num,shp_name)
print "{}".format(logging_text)
logging_list.append(logging_text)
continue
converted=False # 初始化转换状态,初始状态“失败”
try:
# 创建面要素类
arcpy.CreateFeatureclass_management(
sub_shpdir,
shp_name,
"POLYGON",
spatial_reference=spatial_ref
)
# 读取txt文件内容(兼容编码)
# points = []
result=detect_encoding(txt_path)[0]
#print "{} coding:{}".format(txt_path,result)
try:
with io.open(txt_path, 'r',encoding=result) as f:
content = f.read()
# print "{}".format(content)
except UnicodeDecodeError:
with codecs.open(txt_path, 'r', fs_encoding) as f:
content = f.read()
try:
# 按 "@" 分割不同面要素(注意去除空行)
features = [f.strip() for f in content.split("@") if f.strip()]
with arcpy.da.InsertCursor(shp_path,["SHAPE@"]) as cursor:
for feature in features[1:]:
#print(feature)
points=[]
for line in feature.split('\n'):
line=line.strip()
if line.startswith('J'):
parts=line.split(',')
if len(parts)>=4:
x = float(parts[2])
#print(x)
y = float(parts[3])
#print(y)
points.append(arcpy.Point(y,x))
# 构建并插入多边形
polygon = arcpy.Polygon(arcpy.Array(points), spatial_ref)
with arcpy.da.InsertCursor(shp_path, ["SHAPE@"]) as cursor:
cursor.insertRow([polygon])
converted=True
except Exception as e:
logging_text="{}: {}: Warning: Processing failed: {}, {}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num,txt_file,str(e))
print "\033[1;31m{}\033[0m".format(logging_text)
logging_list.append(logging_text)
if converted:
arcpy.AddField_management(shp_path, "dzjgh", "TEXT", field_length=255)
arcpy.CalculateField_management(shp_path, "dzjgh", "'{}'".format(os.path.splitext(txt_file)[0]), "PYTHON")
shp_list.append(shp_path)
logging_text="{}: {}.Convert successfully: {}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num,shp_name)
print "\033[1;32m{}\033[0m".format(logging_text)
logging_list.append(logging_text)
except Exception as e:
logging_text="{}: {}: Warning: Processing failed: {},{}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num,txt_file,str(e))
print "\033[1;31m{}\033[0m".format(logging_text)
logging_list.append(logging_text)
if shp_list:
merged_shp=os.path.join(sub_shpdir,"merge.shp")
try:
arcpy.Merge_management(shp_list, merged_shp)
print "\033[32m\n\n|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|\n\033[0m"
logging_text="{}: Subdirectory: {} merge successfully {} files to: {}.\n{}: . {} files converted failed.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),sub_shpdir,len(shp_list),merged_shp,datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),file_num-len(shp_list))
print "\033[1;32m{}\033[0m".format(logging_text)
logging_list.append(logging_text)
print "\033[32m\n|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|\n\n\033[0m"
except Exception as e:
logging_text="{}: Warning: Merge failed: {}.\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),str(e))
print "\033[1;31m{}\033[0m".format(logging_text)
logging_list.append(logging_text)
else:
logging_text="{}: Warning: No valid files to be merged!\n"
print "\033[1;31m{}\033[0m".format(logging_text)
logging_list.append(logging_text)
#将日志列表写入根目录文件
try:
try:
if os.path.exists(logging_filepath):
os.remove(logging_filepath)
except Exception as e:
logging_text="{}: Warning: Delete {} failed. {}./n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),logging_filepath,str(e))
print "\033[1;31m{}\033[0m".format(logging_text)
try:
with open(logging_filepath,"w") as f:
for item in logging_list:
content="{}\n".format(str(item))
f.write(content)
except Exception as e:
logging_text="{}: Warning: Create {} failed. {}./n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),logging_filepath,str(e))
print "\033[1;31m{}\033[0m".format(logging_text)
except Exception as e:
logging_text="{}: Warning: Create {} failed. {}./n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),logging_filepath,str(e))
print "\033[1;31m{}\033[0m".format(logging_text)