{"id":174,"date":"2012-03-31T13:43:44","date_gmt":"2012-03-31T20:43:44","guid":{"rendered":"http:\/\/blog.light42.com\/wordpress\/?p=174"},"modified":"2012-06-19T22:11:41","modified_gmt":"2012-06-20T05:11:41","slug":"postgis-subtraction","status":"publish","type":"post","link":"http:\/\/blog.light42.com\/wordpress\/?p=174","title":{"rendered":"PostGIS subtraction"},"content":{"rendered":"<p>Threading in python and calling out to PostGIS in each thread can work well. Here is an example of using python&#8217;s <strong>Queue<\/strong> and <strong>threading.Thread<\/strong> to break up a large spatial workload into pieces. It turns out that many spatial data problems are naturally parallelizable, and this is one of them.<\/p>\n<p>Per the <a title=\"PostGIS Manual\" href=\"http:\/\/postgis.refractions.net\/documentation\/manual-svn\/ST_Difference.html\" target=\"_blank\">PostGIS manual<\/a>, subtraction of one geometry from another, <code>ST_Difference( geomA, geomB)<\/code> might be described as<\/p>\n<pre>GeometryA - ST_Intersection(A,B)<\/pre>\n<p>&nbsp;<script type=\"text\/javascript\">jQuery(document).ready(function($) { $(\"#subtrgallery a\").lightBox({fixedNavigation:true});});<\/script><\/p>\n<div id=\"subtrgallery\">\n<a href=\"wp-content\/uploads\/2012\/03\/class_minus_parcels.png\"><img decoding=\"async\" src=\"wp-content\/uploads\/2012\/03\/class_minus_parcels-300x184.png\" alt=\"\" align=\"right\" \/><\/a><\/div>\n<p> A table of geometries can be processed easily if either geomA or geomB is a constant. But what if there are many thousands of geomA, and hundreds of thousands of geomB? Our example is <strong>land type classifications<\/strong> (geomA) and <strong>California county land parcels<\/strong> (geomB). <\/p>\n<p>Here is a look at a finished result:<\/p>\n<p>Areas that are colored either green, dark red or gray, are the sum of the area of parcel property in this county, classified into three categories (<em>greenfield, constrained, urban<\/em>). The remaining yellowish are largely right-of-way (roads, paths, freeways etc). The analysis is done by bringing together two very large input tables &#8211; a land classification of the county divided into three classes (14,000 polygons in this example), and the set of parcel properties (~800,000 records).<\/p>\n<p>Start by subtracting all parcels <strong>p<\/strong> that touch a given classification polygon <strong>l<\/strong> as <em>inner diff<\/em><\/p>\n<blockquote>\n<pre>INSERT into landtypes_minus_parcels\r\nSELECT\r\n      l.id_lt,\r\n      l.landtype as landtype_seamless_orig,\r\n      l.other_class,\r\n      CASE when\r\n        st_difference(  l.wkb_geometry,\r\n               st_union(p.wkb_geometry)  )\r\n        is null then l.wkb_geometry\r\n      else\r\n        st_difference(  l.wkb_geometry,\r\n               st_union(p.wkb_geometry)  )\r\n      end\r\n       as tgeom\r\nFROM\r\n   %s l\r\n   inner join\r\n   %s p\r\n   on st_intersects(l.wkb_geometry, p.wkb_geometry)\r\nWHERE l.id_lt = %s\r\nGROUP BY l.id_lt, l.wkb_geometry;<\/pre>\n<\/blockquote>\n<p>Get a list of primary keys in python from the classification table <strong>l<\/strong><\/p>\n<blockquote>\n<pre>tSQL = '''\r\nselect id_lt from %s order by id_lt;\r\n''' % (targ_landtypes)\r\n\r\ntry:\r\n    gCurs.execute( tSQL )\r\n    gDB.commit()\r\nexcept Exception, E:\r\n    print str(E)\r\n    raise\r\n\r\nidsA  = []\r\nidsTA = gCurs.fetchall()\r\nfor tElem in idsTA: idsA.append( tElem[0] )<\/pre>\n<\/blockquote>\n<p>A <strong>psycopg2<\/strong> query cursor returns an array of tuple &#8211; for convenience take the one and only one result per tuple and build a simple list from them. In this analysis, we take the length of the list of primary keys of classification polygons, divide by the desired number of threads, to get a number of classification polygons that will be handled per thread. Since we have an ordered list of primary keys, it is straightforward to pick two keys at a time &#8211; upper and lower pkey &#8211; and pass them into each thread as dictionary arguments.<\/p>\n<blockquote>\n<pre>tBaseDiv = len( idsA ) \/ gThreadCount\r\ntBaseOffset = 0\r\nfor i in range( gThreadCount ):\r\n    if i == gThreadCount - 1:\r\n       ## last bucket gets any remainder, too\r\n       tEndInd = len( idsA ) - 1\r\n    else:\r\n       tEndInd = tBaseOffset + tBaseDiv - 1\r\n\r\n    tD = {\r\n      'start_id': idsA[ tBaseOffset ],\r\n      'end_id': idsA[ tEndInd ]\r\n    }\r\n\r\n    tBaseOffset += tBaseDiv\r\n    queue.put( tD )<\/pre>\n<\/blockquote>\n<p>Next, a unit of work has to be described; a <strong>threading.Thread<\/strong> subclass could be defined as follows: upon running, retrieve a parameter Dict from the threads queue, and from it the upper and lower primary key bounds for this thread; for each primary key in this range, execute the <em>inner diff<\/em> query; when completed, commit() to the database and signal that this thread is complete.<\/p>\n<blockquote>\n<pre>class Thread_ComputeCTWUnit(threading.Thread):\r\n    \"\"\"Threaded Unit of work\"\"\"\r\n    def __init__(self, queue):\r\n      threading.Thread.__init__(self)\r\n      self.queue = queue\r\n\r\n    def run(self):\r\n      while True:\r\n        #grabs host from queue\r\n        jobD = self.queue.get()\r\n\r\n        tQueuedJobStr = '''\r\n  select id_lt from %s where id_lt &gt;= %s and id_lt &lt; = %s\r\n''' % ( targ_landtypes, jobD['start_id'], jobD['end_id'] )\r\n\r\n        #tTmpAnswer = tLocalCurs.fetchone()[0]\r\n        #print 'start_id=',jobD['start_id'],'; end_id=',jobD['end_id'],'; count=',str(tTmpAnswer)\r\n\r\n        ##---------------------------\r\n        ## each thread has its own pyscopg2 connection\r\n        try:\r\n          tLocalConn = psycopg2.connect( gDSN )\r\n        except Exception, E:\r\n          print str(E),' start_id=',jobD['start_id']\r\n          self.queue.task_done()\r\n          return\r\n\r\n        tLocalCurs = tLocalConn.cursor()\r\n        try:\r\n          tLocalCurs.execute( tQueuedJobStr )\r\n        except Exception, E:\r\n          print str(E)\r\n\r\n        tTA_local = tLocalCurs.fetchall()\r\n        for tInd in tTA_local:\r\n            try:\r\n                tLocalCurs.execute( tSQL_primediff % (targ_landtypes,src_parcels,tInd[0]) )\r\n                #tLocalConn.commit()\r\n            except Exception, E:\r\n                print str(E)\r\n                raise\r\n\r\n        tLocalConn.commit()\r\n        tLocalConn.close()\r\n        ##---------------------------\r\n        #signals to queue job is done\r\n        self.queue.task_done()\r\n        return<\/pre>\n<\/blockquote>\n<p>Given <strong>14,000 classified polygons<\/strong> as input, <strong>parcel records<\/strong> for a county, running on a 16 core linux machine, and creating 14 threads, each thread will run 1000 queries then commit() back to the database. The runtime in practice is about 15%-30% of the time it would take without threading. One reason for this is that even though each thread has the same number of classification polygons to work on, not all polygon subtractions are created equal, so inevitably some sets of queries take longer than others.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Threading in python and calling out to PostGIS in each thread can work well. Here is an example of using python&#8217;s Queue and threading.Thread to break up a large spatial workload into pieces. It turns out that many spatial data problems are naturally parallelizable, and this is one of them. Per the PostGIS manual, subtraction [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[5,4],"tags":[],"_links":{"self":[{"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/174"}],"collection":[{"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=174"}],"version-history":[{"count":84,"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/174\/revisions"}],"predecessor-version":[{"id":591,"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/174\/revisions\/591"}],"wp:attachment":[{"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=174"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=174"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/blog.light42.com\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=174"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}