Geoprocessing scripts 利用多核进行计算

it2022-05-09  31

    作者:Flyingis     本文欢迎友情转载,但请注明作者及原文链接,严禁用于商业目的!       让ArcGIS桌面软件进行GP脚本并行运算确实不是一件容易的事情,一方面是粗粒度AO对象的限制,另外一方面是Python本身的问题。     Python是解释型的语言,使用GIL全局解释器锁在内部禁止并行运算,因此在相同时间内只能有一条指令被执行,为什么存在GIL?是因为Python解释器后台的不可见变量,比如为了进行垃圾回收而维护的引用计数,如果没有GIL,则可能出现由于线程切换导致的对同一对象释放两次的情况( 参考该文 ),Jython和IronPython没有GIL问题,倒是可以拿来一试。对于ArcGIS 9.3.1,使用的Python版本为2.5,目前最新的Python 3.0都发布了,从2.6开始,Python新增multiprocessing模块来 解决这个问题 ,ArcGIS 9.4将支持这一版本。     那么,Parallel Python不能解决所有问题,又必须得使用Python 2.5,如何有效利用多核计算(注意,是利用多核计算,并非并行计算)?通常情况下,我们会将多个任务分解成不同的脚本,开启多个Python解释器或ArcCatalog运行,每个程序占用一个核,多核可以同时利用起来,各自处理完成后,进行汇总,得到最终计算结果。ESRI Resource Center上提供了一种类似的方法,差别在于将所有任务都写在一个Python脚本中,实际上和前者的原理相同,作为参考。 代码   1  #  Author: ESRI   2  #  Date:   August 2009   3  #  Purpose: This script demonstrates a methodology for parallel geoprocessing in ArcGIS Desktop.   4  #           The input is split into parts which are processed by separate subprocesses,   5  #           their outputs are appended into the final result.  In this specific case, the   6  #           geoprocessing undertaken is address geocoding, but any feasible geoprocessing   7  #           can be done by simple modification of this script.   8  #           Look for the comment line "Begin code block for GP process of interest"   9  try :  10       import  arcgisscripting, os, subprocess, sys, time, traceback  11      gp  =  arcgisscripting.create( 9.3 )  12      gp.overwriteoutput  =  True  13      startTime  =  time.time()  14        15       # Get the number of concurrent GP processes desired.  16       # It is recommended this agree with the number of your available cores.  17      pCount  =  gp.getparameter(0)  18      inObject  =  gp.getparameterastext( 1 )  19      result  =  gp.getcount_management(inObject)  20      count  =  int(result.getoutput(0))  21      gp.addmessage( " Processing  "   +  inObject  +   "  in  "   +  str(pCount)  +   "  concurrent processes. " )     22       # We will use a generic approach to splitting the input into parts, namely slicing it  23       # with modulo arithmetic.  For example, if we want to run 4 concurrent processes  24       # we create 4 parts from the input using "ObjectID modulo 4 = 0 (then 1,2,3)"  25        26       # Build the splitting expressions(Note you will need to edit this code block if using input from SDE on Oracle)  27      inDesc  =  gp.describe(inObject)  28      delimitedOIDName  =  str(gp.addfielddelimiters(inObject,str(inDesc.oidfieldname)))  29      wsType  =  str(gp.describe(str(inDesc.path)).workspacetype)  30      queryList  =  []  31       for  q  in  range(pCount):  32           if  wsType  ==   " RemoteDatabase " # SDE  33               # Non-Oracle  34              queryList.append(delimitedOIDName  +   "  %  "   +  str(pCount)  +   "  =  "   +  str(q))  35               # Oracle - comment out the above line and uncomment the below line when using Oracle inputs  36               # queryList.append("mod(\"" + delimitedOIDName + "\"," + str(pCount) + ") = " + str(q))  37           elif  wsType  ==   " FileSystem " # DBase  38              queryList.append( " mod(\ ""  + delimitedOIDName +  " \ " , "   +  str(pCount)  +   " ) =  "   +  str(q))  39           elif  wsType  ==   " LocalDatabase "   and  inDesc.path.endswith( " .mdb " ):  # Personal GDB  40              queryList.append(delimitedOIDName  +   "  mod  "   +  str(pCount)  +   "  =  "   +  str(q))  41           elif  wsType  ==   " LocalDatabase "   and  inDesc.path.endswith( " .gdb " ):  # File GDB  42              queryList.append( " mod(\ ""  + delimitedOIDName +  " \ " , "   +  str(pCount)  +   " ) =  "   +  str(q))  43       # Create a seed name for the parts  44       if  inDesc.name.endswith(( " .dbf " , " .shp " )):  45          seedName  =  str(inDesc.name).split( " . " )[0]  46       elif  inDesc.name.count( " . " >   1 # Fully qualified DBMS object name  47          seedName  =  str(inDesc.name).split( " . " )[ - 1 ]  48       else :  49          seedName  =  str(inDesc.name)  50            51       # Make file GDBs at a well known path (to the system); edit this to use fast disk if you have it.  52       # The child scripts will write into these separate file GDBs so as to avoid lock conflicts.  53       # import tempfile  54       # wkPath = tempfile.gettempdir()  55      wkPath  =  r " C:\Temp "  56      gdbNameList  =  []  57      j  =  0  58       for  i  in  range(pCount):  59          gdbName  =  wkPath + " \parallel " + str(i) + " .gdb "  60           while  gp.exists(gdbName):  # Do not clobber other parallel processes  61              j += 1  62              gdbName  =  wkPath + " \parallel " + str(j) + " .gdb "  63          gdbNameList.append(gdbName)  64          result  =  gp.createfilegdb_management(wkPath,os.path.basename(gdbName))  65      gp.workspace  =   " in_memory "  66        67       # Create scripts that geoprocess the parts, write these into the file GDB directories created above  68      scriptPathList  =  []  69       for  r  in  range(pCount):  70          gdbName  =  gdbNameList[r]  71          sName  =  gdbName + " /parallel.py "  72          scriptPathList.append(sName)  73          scriptLines  =  []  74          scriptLines.append( " import arcgisscripting\n " )  75          scriptLines.append( " gp = arcgisscripting.create(9.3)\n " )  76          scriptLines.append( " gp.overwriteoutput = True\n " )  77           if  str(inDesc.datatype)  ==   " Table "   or  str(inDesc.datatype)  ==   " DbaseTable " :  78              scriptLines.append( " result = gp.tabletotable_conversion( " + \  79                                  " \ "" +inObject+ " \ " , " + \  80                                  " \ "" +gdbName+ " \ " , " + \  81                                  " \ "" +seedName+str(r)+ " \ " , " + \  82                                  " \ "" +queryList[r]+ " \ " )\n " )  83           else :  84              scriptlines.append( " result = gp.featureclasstofeatureclass_conversion( " + \  85                                  " \ "" +inObject+ " \ " , " + \  86                                  " \ "" +gdbName+ " \ " , " + \  87                                  " \ "" +seedName+str(r)+ " \ " , " + \  88                                  " \ "" +queryList[r]+ " \ " )\n " )  89          scriptLines.append( " part = result.getoutput(0)\n " )  90           #  91           # Begin code block for GP process of interest:  92          scriptLines.append( " aLocator = r\ " D:\Dev\Python\GP_multicore\TestData.gdb\NZ Locator\ "" + " \n " )  93           # We could use a service.  Note the \\a in \\arcgis below would be a BELL literal if not escaped as in \a (rcgis)  94           # scriptLines.append("aLocator = r\"GIS Servers\\arcgis on bruceh\CA_Locator.GeocodeServer\""+"\n")  95          scriptLines.append( " fieldMap = \ " Address Address;Suburb Suburb;Mailtown Mailtown;Postcode Postcode\ " \n " )  96          scriptLines.append( " outFC =  " + " \ "" +gdbName+ " \\ " + " Result " +str(r)+ " \ " \n " )  97          scriptLines.append( " result = gp.geocodeaddresses_geocoding(part,aLocator,fieldMap,outFC)\n " )  98          scriptLines.append( " result = result.getoutput(0)\n " )  99           # End code block for GP process of interest: 100           # 101           # Make stdio aware of completion - this message will be returned to the GP stdio message pipe 102          scriptLines.append( " print  " +   " \ " Finished Part "  + str(r) +  " \ " \n " ) 103          scriptLines.append( " del gp\n " ) 104           # Write script 105          f  =  open(sName, ' w ' ) 106          f.writelines(scriptLines) 107          f.flush 108          f.close() 109       # Launch the parallel processes 110      gp.addmessage( " Launching parallel processes\n " ) 111      processList  =  [] 112       for  r  in  range(pCount): 113          processList.append(subprocess.Popen([r " C:\Python25\python.exe " ,scriptPathList[r]],\ 114                                              shell = True,\ 115                                              stdout = subprocess.PIPE,\ 116                                              stderr = subprocess.PIPE)) 117          gp.addmessage( " Launched process  "   +  str(r) +   " \n " ) 118          time.sleep( 2 ) 119       # 120       # Wait for completion 121      messageList  =  [] 122      Done  =  False 123       while   not  Done: 124          Done  =  True 125           for  p  in  processList: 126               if  p.poll()  is  None: 127                  Done  =  False 128               else : 129                  messageList.append( " Process  "   +  str(processList.index(p))  +   "  complete at  "   +  str(time.ctime())  +   " \n " ) 130              stdOutLines  =  p.stdout.readlines() 131               for  s  in  stdOutLines: 132                  messageList.append(s) 133              stdErrorLines  =  p.stderr.readlines() 134               for  e  in  stdErrorLines: 135                  messageList.append(e) 136               if   not  Done:  # Save 2 seconds if complete 137                  time.sleep( 2 ) 138       for  m  in  messageList: 139          gp.addmessage(m) 140       # Merge the results 141      outFC  =  gp.getparameterastext( 2 ) 142      fm  =  gp.createobject( " fieldmappings " ) 143      vt  =  gp.createobject( " valuetable " ) 144       for  g  in  gdbNameList: 145          gp.workspace  =  g 146           if  len(gp.listfeatureclasses( " Result* " ))  >  0: 147              fcDesc  =  gp.describe(gp.listfeatureclasses( " Result* " )[0]) 148              fm.addtable(fcDesc.catalogpath) 149              vt.addrow(fcDesc.catalogpath) 150      gp.workspace  =   " in_memory " 151      result  =  gp.merge_management(vt,outFC,fm) 152       # Clean up 153       for  g  in  gdbNameList: 154          result  =  gp.delete_management(g) 155       # Done! 156       157      endTime  =  time.time() 158      gp.addmessage( " Geoprocessing duration was  "   +  str(int(endTime  -  startTime))  +   "  seconds. " ) 159      gp.addmessage( " Net performance was  "   +  \ 160                    str(int( 1.0   *  count  /  (endTime  -  startTime)  *   3600.0 ))  +   "  records per hour. " ) 161       del  gp 162       163  except : 164      tb  =  sys.exc_info()[ 2 ] 165      tbinfo  =  traceback.format_tb(tb)[0] 166      pymsg  =   " PYTHON ERRORS:\nTraceback Info:\n "   +  tbinfo  +   " \nError Info:\n     "   +  \ 167              str(sys.exc_type) +   " "   +  str(sys.exc_value)  +   " \n " 168      gp.AddError(pymsg) 169       170      msgs  =   " GP ERRORS:\n "   +  gp.GetMessages( 2 +   " \n " 171      gp.AddError(msgs)

转载于:https://www.cnblogs.com/flyingis/archive/2009/12/01/1614334.html


最新回复(0)