Package buildxml :: Package plugins :: Module portal
[hide private]
[frames] | no frames]

Source Code for Module buildxml.plugins.portal

  1  #!/usr/bin/python 
  2  # -*- coding: utf-8 -*- 
  3   
  4  """ 
  5  Contains a generic plugin that can query plone portals. 
  6   
  7  @author: Johannes Schwenk 
  8  @copyright: 2010, Johannes Schwenk 
  9  @version: 3.0 
 10  @date: 2010-09-15 
 11   
 12   
 13  """ 
 14   
 15  import sys 
 16   
 17  # Imortant! 
 18  reload(sys) 
 19  sys.setdefaultencoding('utf-8') 
 20   
 21  import os 
 22  import codecs 
 23  import shutil 
 24   
 25   
 26  from urlparse import urlsplit 
 27  from datetime import datetime 
 28  from time import sleep 
 29  from xmlgetter.plugin import BaseSyncPlugin 
 30  from xmlgetter.state import PortalSourceState 
 31   
 32  from config import TEMP_DIR, TEMP_FILE_EXT, MAX_PORTAL_RETRIES, \ 
 33  PORTAL_REQUEST_INCREMENT, PORTAL_RETRY_WAIT 
 34   
 35  # Imortant! 
 36  reload(sys) 
 37  sys.setdefaultencoding('utf-8') 
 38   
 39   
40 -class SyncPlugin_portal(BaseSyncPlugin):
41 """ 42 This is a generic plugin that incrementally queries plone portals for 43 changes in their C{portal_catalog}. 44 It will be instantiated by L{xmlgetter.controller} for every entry in the 45 L{config.PORTALS} variable in L{config} and then its L{run()} 46 method from the base class is called. 47 48 49 Note: If you change the classname, you will have to do so in 50 L{config.PORTAL_PLUGIN_NAME} too. Also it has to match the 51 modules name! 52 53 """ 54 55 _state = None 56 """ 57 @ivar: The state of the plugin. 58 @type: L{PortalSourceState} 59 60 """ 61 62 _updated = None 63 """ 64 @ivar: Stores the date and time of last update. 65 @type: datetime 66 67 """ 68 69
70 - def __init__(self, source_name, url, NO_NET=False):
71 """ 72 Initialize the plugin and the State. Also set some variables for 73 use in this plugin. 74 75 @param source_name: The name of the source as displayed in logfile 76 entries and statistics. 77 @type source_name: string 78 @param url: The URL for the script C{remoteSyncQueryXML} in the plone 79 portal. 80 @type url: string 81 @param NO_NET: Whether to actually get data from the net, or just 82 use that from a previous run - if available. 83 @type NO_NET: bool 84 85 """ 86 BaseSyncPlugin.__init__(self, source_name, url, NO_NET) 87 self._source_name = source_name 88 self._state = PortalSourceState(source_name) 89 self._updated = None 90 self._split_url = urlsplit(url) 91 self._base_url = u'%s://%s' % \ 92 (self._split_url.scheme, self._split_url.netloc)
93 94
95 - def _loadState(self):
96 """ 97 Load the state of the plugin from its state file. 98 99 @return: C{True} if state could be loaded, C{False} otherwise. 100 @rtype: bool 101 102 """ 103 state = self._state.load() 104 if state: 105 self._state = state 106 return True 107 self._stats.messages.append(u'WARNING: Could not load portals state.') 108 self._stats.status = u'W' 109 return False
110 111
112 - def _writeState(self):
113 """ 114 Write the state of the plugin to its state file. 115 116 @return: C{True} if state could be successfully written. 117 @rtype: bool 118 119 """ 120 return self._state.write()
121 122 123
124 - def _getData(self):
125 """ 126 Incrementally get all data from the plone portal, 127 128 Queries will continue to be sent to the portal until the response 129 is only the string u'END'. Each query will request the next X entries 130 specified in L{config.PORTAL_REQUEST_INCREMENT}. If a request fails, 131 the plugin will wait for X seconds specified in 132 L{config.PORTAL_RETRY_WAIT} and retry again, up to X times specified 133 in L{config.MAX_PORTAL_RETRIES}. 134 135 The result of the queries is written to L{_intermediate_temp_filename} 136 and will on success be copied to L{_temp_filename}, which will then 137 be processed by L{self._consolidate} 138 139 140 @return: C{False} if an error or warning occurred, C{True} otherwise. 141 @rtype: bool 142 143 """ 144 self._updated = datetime.now() 145 146 res = '' 147 idx = 0 148 149 self.logger.info(u'Requesting "%s" and subsequents' % self._url) 150 151 #import pdb; pdb.set_trace() 152 153 try: 154 155 f = codecs.open(self._intermediate_temp_filename, u'w', u'utf-8') 156 except IOError, e: 157 self.logger.exception(u'Error opening file "%s": %s' 158 % (self._temp_filename, e)) 159 self._stats.messages.append(u'ERROR: Error opening file "%s": %s' 160 % (self._temp_filename, e)) 161 self._stats.status = u'F' 162 return False 163 164 while f and not res == 'END': 165 data = {u'last_query': self._state.last_query, 166 u'from_idx': idx, 167 u'increment': PORTAL_REQUEST_INCREMENT} 168 169 attempt = 0 170 response = False 171 172 while attempt < MAX_PORTAL_RETRIES and not response: 173 attempt = attempt + 1 174 response = self._requestURL(self._url, data) 175 if not response: 176 self.logger.info(u'Retry! Waiting for %s seconds ...' 177 % PORTAL_RETRY_WAIT) 178 sleep(PORTAL_RETRY_WAIT) 179 180 if response and hasattr(response, u'code') and response.code < 400: 181 # Request was successful, we have data 182 183 try: 184 res = response.read() 185 except Exception, e: 186 self.logger.exception( 187 u'Reading response failed: %s' % e) 188 self._stats.messages.append(u'ERROR: Reading response' 189 u' failed: %s' % e) 190 self._stats.status = u'F' 191 return False 192 else: 193 try: 194 if not res == 'END': 195 f.write(res) 196 except Exception, e: 197 self.logger.exception( 198 u'Writing to file failed: %s' % e) 199 self._stats.messages.append(u'ERROR: Writing to file' 200 u' failed: %s' % e) 201 self._stats.status = u'F' 202 return False 203 #else: 204 #self.logger.debug(u'Response for "%s" written' % \ 205 #self._url) 206 else: 207 self.logger.warn(u'Reading from server failed, data may be' 208 u' incomplete! Removing temporary file %s...' 209 % self._temp_filename) 210 self._stats.messages.append(u'ERROR: Reading from server' 211 u' failed, data may be incomplete! Removing temporary' 212 u' file %s...' % self._temp_filename) 213 os.remove(self._temp_filename) 214 self._stats.status = u'F' 215 return False 216 217 idx = idx + PORTAL_REQUEST_INCREMENT 218 219 shutil.move(self._intermediate_temp_filename, self._temp_filename) 220 self.logger.debug(u'Moved %s to %s' 221 % (self._intermediate_temp_filename, self._temp_filename)) 222 self.logger.info(u'"%s" read' % self._url) 223 return True
224 225
226 - def _consolidate(self):
227 """ 228 This function consolidates the existing data for the portal and the 229 new entries fetched by L{_getData}. 230 231 There are four possibilities: 232 233 1. The entry is totally B{new}: 234 - It has content 235 - It B{has no} entry in the portals state data 236 2. The entry was B{modified}: 237 - It has content 238 - It B{has} an entry in the portals state data 239 3. The entry is not new and was not modified, a B{stub}: 240 - It has no content 241 - It B{has} an entry in the portals state data 242 4. Entry is static 243 - It has content 244 - It has a <static /> tag 245 246 In the first two cases, the newly fetched entry will be 247 written to the the file L{_intermediate_xml_filename} and on 248 success be moved to L{_xml_filename}. 249 250 In the third case, the entry is copied from the old data. To do this 251 efficiently, the plugin saves a "URL to position and lengh" mapping in 252 its state. So to read an entry from the old data, we only have to seek 253 to the right position in the old data's file and copy the specified 254 number of bytes over to the new data's file. Since the entries that 255 are sent by the portal are sorted ascending by modfication time, and 256 are alway processed in that order, we will never have to seek 257 back wards in the old data's file, thus increasing performance. 258 259 260 @todo: 261 This can be serious: 262 If an object in the portal reports a wrong modification 263 date, this could lead to this case. Object A is not 264 existant when the first run is initialized. On the 265 second run, object A exists, but reports a modification 266 date earlier than that of the first run, so it is 267 sent as a stub, although the data of A has never been 268 indexed before! 269 If a entry is modified during the update and has 270 already been acquired as beeing unmodified, there 271 is a slim chance, that the entry appears as both, 272 modified and as a stub. 273 If a entry is deleted during the update there is 274 a chance of receiving a duplicate or missing an entry. 275 Also, this case could happen, if the modification date 276 is not used properly in C{remoteSyncQueryXML} - 277 investigate! 278 This could be solved by discarding 279 incomplete entries, issuing a warning about this with 280 full url of the entry and doing a full run every week 281 or so... Or, collect all urls of the false stub entries 282 and request them afterwards one by one. Would require 283 changes in C{remoteSyncQueryXML}. Second option sounds 284 best... 285 This has B{no priority} at the moment, since the chances 286 for this to occurr are very slim. 287 288 289 @return: C{False} if an error or warning occurred, C{True} otherwise. 290 @rtype: bool 291 292 """ 293 294 fetched_data = None 295 old_data = None 296 297 # Open the file with the newly fetched data. 298 try: 299 fetched_data = codecs.open(self._temp_filename, u'r') 300 except IOError, e: 301 self.logger.exception(u'Error opening file: %s' % e) 302 self._stats.messages.append(u'ERROR: Error opening file: %s' % e) 303 self._stats.status = u'F' 304 return False 305 306 # Open the file with the old data from the preceeding run. 307 if os.path.exists(self._xml_filename): 308 try: 309 old_data = codecs.open(self._xml_filename, u'r') 310 except IOError, e: 311 self.logger.exception(u'Error opening file: %s' % e) 312 self._stats.messages.append(u'ERROR: Error opening' 313 u' file: %s' % e) 314 self._stats.status = u'F' 315 return False 316 else: 317 self.logger.warn(u'No data from previous run found') 318 319 # Open the file that will contain the consolidated data. 320 try: 321 intermediate_data = \ 322 codecs.open(self._intermediate_xml_filename, u'w', u'utf-8') 323 except IOError, e: 324 self.logger.exception(u'Error opening file: %s' % e) 325 self._stats.messages.append(u'ERROR: Error opening file: %s' % e) 326 self._stats.status = u'F' 327 return False 328 329 330 has_content = False # Do we have content for the current entry ? 331 in_entry = False # Are we inside an entry ? 332 url = None # URL of the current entry. 333 static = False 334 335 entry = u'' # The current entry as a string. 336 337 # Will be saved as portal state later 338 new_urlmap = {} 339 340 # Loop over newly fetched data. 341 for line in fetched_data: 342 in_entry = in_entry or line.lstrip().startswith(u'<entry>') 343 entry_end = line.lstrip().startswith(u'</entry>') 344 has_content = has_content or line.lstrip().startswith(u'<title>') 345 346 """ 347 There are four possibilities: 348 1. The entry is totally new: 349 - It has content 350 - It *has no* entry in the portals data 351 2. The entry was modified: 352 - It has content 353 - It *has* an entry in the portals data 354 3. The entry is not new and was not modified: 355 - It has no content 356 - It *has* an entry in the portals data 357 4. Entry is static 358 - It has content 359 - It has a <static /> tag 360 """ 361 if in_entry: 362 static = static or line.startswith(u'\t<static />') 363 url_start = line.find(u'<url><![CDATA[') 364 url_end = line.find(u']]></url>') 365 if (url_start + 1) and (url_end + 1): 366 url = line[url_start + 14: url_end] 367 if entry_end: 368 self._stats.entries = self._stats.entries + 1 369 if has_content: 370 entry = (u'%s%s' 371 % (entry, u'\t\t<source><![CDATA[%s]]>%s' 372 % (self._source_name, 373 u'</source>\n\t</entry>\n'))) 374 if not url in self._state.url_map.keys() \ 375 and not static: 376 # New entry, case 1 377 self._stats.new_entries \ 378 = self._stats.new_entries + 1 379 self.logger.debug(u'Entry is new: %s' % url) 380 elif not static: 381 # Modified entry, case 2 382 del self._state.url_map[url] 383 self._stats.modified_entries \ 384 = self._stats.modified_entries + 1 385 self.logger.debug(u'Entry was modified: %s' % url) 386 else: 387 # Static entry, case 4 388 del self._state.url_map[url] 389 self._stats.static_entries \ 390 = self._stats.static_entries + 1 391 self.logger.debug(u'Entry is static: %s' % url) 392 393 394 elif url in self._state.url_map.keys(): 395 # Stub entry, case 3 396 old_data.seek(self._state.url_map[url][0]) 397 entry = old_data.read(self._state.url_map[url][1]) 398 self._stats.unmodified_entries \ 399 = self._stats.unmodified_entries + 1 400 self.logger.debug(u'Entry was a stub, copied ' 401 u'existing entry: %s' % url) 402 del self._state.url_map[url] 403 else: 404 self.logger.warn(u'Incomplete entry: \n%s' % entry) 405 self._stats.messages.append(u'WARNING: Incomplete' 406 u' etnry: \n%s' % entry) 407 self._stats.status = u'W' 408 in_entry = False 409 has_content = False 410 static = False 411 entry = u'' 412 continue 413 414 # Write entry to intermediate_data and store its 415 # position in the new data. 416 start = intermediate_data.tell() 417 new_urlmap[url] = [start, ] 418 intermediate_data.write(entry) 419 self.logger.debug(u'Entry written') 420 new_urlmap[url].append( 421 intermediate_data.tell() - start) 422 self.logger.debug(u'Entry start: %s, entry end: %s' 423 % (start, intermediate_data.tell() - start)) 424 425 in_entry = False 426 has_content = False 427 static = False 428 entry = u'' 429 else: 430 entry = u'%s%s' % (entry, line) 431 432 433 # Overwrite xml file with consolidated data: 434 shutil.move(self._intermediate_xml_filename, self._xml_filename) 435 self.logger.debug(u'Moved %s to %s' 436 % (self._intermediate_xml_filename, self._xml_filename)) 437 self._state.url_map = new_urlmap 438 self._state.last_query = self._updated 439 #self.logger.debug(self._updated) 440 self.logger.info(u'Consolidated data saved') 441 return True
442