Package rpmUtils :: Module oldUtils
[hide private]
[frames] | no frames]

Source Code for Module rpmUtils.oldUtils

  1  #!/usr/bin/python -tt 
  2   
  3  import rpm 
  4  import types 
  5  import os 
  6  import gzip 
  7  import sys 
  8  from gzip import write32u, FNAME 
  9  from urlgrabber.grabber import URLGrabError 
 10  from zlib import error as zlibError 
 11   
12 -def log(num, msg):
13 print >>sys.stderr, msg
14 errorlog = log 15
16 -def _(msg):
17 return msg
18 19 20 # pylint: disable-msg=E0602 21
22 -def checkheader(headerfile, name, arch):
23 """check a header by opening it and comparing the results to the name and arch 24 we believe it to be for. if it fails raise URLGrabError(-1)""" 25 h = Header_Work(headerfile) 26 fail = 0 27 if h.hdr is None: 28 fail = 1 29 else: 30 if name != h.name() or arch != h.arch(): 31 fail = 1 32 if fail: 33 raise URLGrabError(-1, _('Header cannot be opened or does not match %s, %s.') % (name, arch)) 34 return
35 36
37 -def checkRpmMD5(package, urlgraberror=0):
38 """take a package, check it out by trying to open it, return 1 if its good 39 return 0 if it's not""" 40 ts.sigChecking('md5') 41 fdno = os.open(package, os.O_RDONLY) 42 try: 43 ts.hdrFromFdno(fdno) 44 except rpm.error, e: 45 good = 0 46 else: 47 good = 1 48 os.close(fdno) 49 ts.sigChecking('default') 50 51 if urlgraberror: 52 if not good: 53 raise URLGrabError(-1, _('RPM %s fails md5 check') % (package)) 54 else: 55 return 56 else: 57 return good
58
59 -def checkSig(package):
60 """ take a package, check it's sigs, return 0 if they are all fine, return 61 1 if the gpg key can't be found, 2 if the header is in someway damaged, 62 3 if the key is not trusted, 4 if the pkg is not gpg or pgp signed""" 63 ts.sigChecking('default') 64 fdno = os.open(package, os.O_RDONLY) 65 try: 66 hdr = ts.hdrFromFdno(fdno) 67 except rpm.error, e: 68 if str(e) == "public key not availaiable": 69 return 1 70 if str(e) == "public key not available": 71 return 1 72 if str(e) == "public key not trusted": 73 return 3 74 if str(e) == "error reading package header": 75 return 2 76 else: 77 error, siginfo = getSigInfo(hdr) 78 if error == 101: 79 os.close(fdno) 80 del hdr 81 return 4 82 else: 83 del hdr 84 os.close(fdno) 85 return 0
86
87 -def getSigInfo(hdr):
88 """checks if a computerhand back signature information and an error code""" 89 string = '%|DSAHEADER?{%{DSAHEADER:pgpsig}}:{%|RSAHEADER?{%{RSAHEADER:pgpsig}}:{%|SIGGPG?{%{SIGGPG:pgpsig}}:{%|SIGPGP?{%{SIGPGP:pgpsig}}:{(none)}|}|}|}|' 90 siginfo = hdr.sprintf(string) 91 if siginfo != '(none)': 92 error = 0 93 sigtype, sigdate, sigid = siginfo.split(',') 94 else: 95 error = 101 96 sigtype = 'MD5' 97 sigdate = 'None' 98 sigid = 'None' 99 100 infotuple = (sigtype, sigdate, sigid) 101 return error, infotuple
102
103 -def getProvides(header):
104 provnames = [] 105 provides = header[rpm.RPMTAG_PROVIDENAME] 106 if provides is None: 107 pass 108 elif type(provides) is types.ListType: 109 provnames.extend(provides) 110 else: 111 provnames.append(provides) 112 return provnames
113
114 -def compareEVR((e1, v1, r1), (e2, v2, r2)):
115 # return 1: a is newer than b 116 # 0: a and b are the same version 117 # -1: b is newer than a 118 def rpmOutToStr(arg): 119 if type(arg) != types.StringType and arg != None: 120 arg = str(arg) 121 return arg
122 e1 = rpmOutToStr(e1) 123 v1 = rpmOutToStr(v1) 124 r1 = rpmOutToStr(r1) 125 e2 = rpmOutToStr(e2) 126 v2 = rpmOutToStr(v2) 127 r2 = rpmOutToStr(r2) 128 rc = rpm.labelCompare((e1, v1, r1), (e2, v2, r2)) 129 log(6, '%s, %s, %s vs %s, %s, %s = %s' % (e1, v1, r1, e2, v2, r2, rc)) 130 return rc 131 132
133 -def formatRequire (name, version, flags):
134 if flags: 135 if flags & (rpm.RPMSENSE_LESS | rpm.RPMSENSE_GREATER | rpm.RPMSENSE_EQUAL): 136 name = name + ' ' 137 if flags & rpm.RPMSENSE_LESS: 138 name = name + '<' 139 if flags & rpm.RPMSENSE_GREATER: 140 name = name + '>' 141 if flags & rpm.RPMSENSE_EQUAL: 142 name = name + '=' 143 name = name + ' %s' % version 144 return name
145 146
147 -def openrpmdb():
148 try: 149 db = rpm.TransactionSet(conf.installroot) 150 except rpm.error, e: 151 errorlog(0, _("Could not open RPM database for reading. Perhaps it is already in use?")) 152 return db
153 154 155 # this is done to make the hdr writing _more_ sane for rsync users especially 156 __all__ = ["GzipFile","open"] 157
158 -class GzipFile(gzip.GzipFile):
159 - def _write_gzip_header(self):
160 self.fileobj.write('\037\213') # magic header 161 self.fileobj.write('\010') # compression method 162 fname = self.filename[:-3] 163 flags = 0 164 if fname: 165 flags = FNAME 166 self.fileobj.write(chr(flags)) 167 write32u(self.fileobj, long(0)) 168 self.fileobj.write('\002') 169 self.fileobj.write('\377') 170 if fname: 171 self.fileobj.write(fname + '\000')
172 173
174 -def _gzipOpen(filename, mode="rb", compresslevel=9):
175 return GzipFile(filename, mode, compresslevel)
176
177 -class RPM_Base_Work:
178 - def __init__(self):
179 self.hdr = None
180
181 - def _getTag(self, tag):
182 if self.hdr is None: 183 errorlog(0, _('Got an empty Header, something has gone wrong')) 184 #FIXME should raise a yum error here 185 sys.exit(1) 186 return self.hdr[tag]
187
188 - def isSource(self):
189 if self._getTag('sourcepackage') == 1: 190 return 1 191 else: 192 return 0
193
194 - def name(self):
195 return self._getTag('name')
196
197 - def arch(self):
198 return self._getTag('arch')
199
200 - def epoch(self):
201 return self._getTag('epoch')
202
203 - def version(self):
204 return self._getTag('version')
205
206 - def release(self):
207 return self._getTag('release')
208
209 - def evr(self):
210 e = self._getTag('epoch') 211 v = self._getTag('version') 212 r = self._getTag('release') 213 return (e, v, r)
214
215 - def nevra(self):
216 n = self._getTag('name') 217 e = self._getTag('epoch') 218 v = self._getTag('version') 219 r = self._getTag('release') 220 a = self._getTag('arch') 221 return (n, e, v, r, a)
222
223 - def writeHeader(self, headerdir, compress):
224 # write the header out to a file with the format: name-epoch-ver-rel.arch.hdr 225 # return the name of the file it just made - no real reason :) 226 (name, epoch, ver, rel, arch) = self.nevra() 227 if epoch is None: 228 epoch = '0' 229 if self.isSource(): 230 headerfn = "%s/%s-%s-%s-%s.src.hdr" % (headerdir, name, epoch, ver, rel) 231 else: 232 headerfn = "%s/%s-%s-%s-%s.%s.hdr" % (headerdir, name, epoch, ver, rel, arch) 233 234 if compress: 235 headerout = _gzipOpen(headerfn, "w") 236 else: 237 headerout = open(headerfn, "w") 238 headerout.write(self.hdr.unload(1)) 239 headerout.close() 240 return(headerfn)
241
242 -class Header_Work(RPM_Base_Work):
243 """for operating on hdrs in and out of the rpmdb 244 if the first arg is a string then it's a filename 245 otherwise it's an rpm hdr"""
246 - def __init__(self, header):
247 if type(header) is types.StringType: 248 try: 249 fd = gzip.open(header, 'r') 250 try: 251 h = rpm.headerLoad(fd.read()) 252 except rpm.error, e: 253 errorlog(0,_('Damaged Header %s') % header) 254 h = None 255 except IOError,e: 256 fd = open(header, 'r') 257 try: 258 h = rpm.headerLoad(fd.read()) 259 except rpm.error, e: 260 errorlog(0,_('Damaged Header %s') % header) 261 h = None 262 except ValueError, e: 263 errorlog(0,_('Damaged Header %s') % header) 264 h = None 265 except zlibError, e: 266 errorlog(0,_('Damaged Header %s') % header) 267 h = None 268 fd.close() 269 else: 270 h = header 271 self.hdr = h
272 273
274 -class RPM_Work(RPM_Base_Work):
275 - def __init__(self, rpmfn):
276 ts.setVSFlags(~(rpm._RPMVSF_NOSIGNATURES)) 277 fd = os.open(rpmfn, os.O_RDONLY) 278 try: 279 self.hdr = ts.hdrFromFdno(fd) 280 except rpm.error, e: 281 errorlog(0, _('Error opening rpm %s - error %s') % (rpmfn, e)) 282 self.hdr = None 283 os.close(fd)
284
285 -class Rpm_Ts_Work:
286 """This should operate on groups of headers/matches/etc in the rpmdb - ideally it will 287 operate with a list of the Base objects above, so I can refer to any one object there 288 not sure the best way to do this yet, more thinking involved"""
289 - def __init__(self, dbPath='/'):
290 try: 291 if conf.installroot: 292 if conf.installroot != '/': 293 dbPath = conf.installroot 294 except NameError, e: 295 pass 296 297 self.ts = rpm.TransactionSet(dbPath) 298 299 self.methods = ['addInstall', 'addErase', 'run', 'check', 'order', 'hdrFromFdno', 300 'closeDB', 'dbMatch', 'setFlags', 'setVSFlags', 'setProbFilter']
301
302 - def __getattr__(self, attribute):
303 if attribute in self.methods: 304 return getattr(self.ts, attribute) 305 else: 306 raise AttributeError, attribute
307
308 - def match(self, tag = None, search = None, mire = None):
309 """hands back a list of Header_Work objects""" 310 hwlist = [] 311 # hand back the whole list of hdrs 312 if mire is None and tag is None and search is None: 313 hdrlist = self.ts.dbMatch() 314 315 else: 316 #just do a non-mire'd search 317 if mire == None: 318 hdrlist = self.ts.dbMatch(tag, search) 319 else: 320 # mire search 321 if mire == 'glob': 322 hdrlist = self.ts.dbMatch() 323 hdrlist.pattern(tag, rpm.RPMMIRE_GLOB, search) 324 elif mire == 'regex': 325 hdrlist = self.ts.dbMatch() 326 hdrlist.pattern(tag, rpm.RPMMIRE_REGEX, search) 327 elif mire == 'strcmp': 328 hdrlist = self.ts.dbMatch() 329 hdrlist.pattern(tag, rpm.RPMMIRE_STRCMP, search) 330 else: 331 hdrlist = self.ts.dbMatch() 332 hdrlist.pattern(tag, rpm.RPMMIRE_DEFAULT, search) 333 334 for hdr in hdrlist: 335 hdrobj = Header_Work(hdr) 336 hwlist.append(hdrobj) 337 return hwlist
338
339 - def sigChecking(self, sig):
340 """pass type of check you want to occur, default is to have them off""" 341 if sig == 'md5': 342 #turn off everything but md5 - and we need to the check the payload 343 self.ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD)) 344 elif sig == 'none': 345 # turn off everything - period 346 self.ts.setVSFlags(~(rpm._RPMVSF_NOSIGNATURES)) 347 elif sig == 'default': 348 # set it back to the default 349 self.ts.setVSFlags(rpm.RPMVSF_DEFAULT) 350 else: 351 raise AttributeError, sig
352