1
2
3 import rpm
4 import types
5 import os
6 import gzip
7 import sys
8 from gzip import write32u, FNAME
9 from urlgrabber.grabber import URLGrabError
10 from zlib import error as zlibError
11
13 print >>sys.stderr, msg
14 errorlog = log
15
18
19
20
21
23 """check a header by opening it and comparing the results to the name and arch
24 we believe it to be for. if it fails raise URLGrabError(-1)"""
25 h = Header_Work(headerfile)
26 fail = 0
27 if h.hdr is None:
28 fail = 1
29 else:
30 if name != h.name() or arch != h.arch():
31 fail = 1
32 if fail:
33 raise URLGrabError(-1, _('Header cannot be opened or does not match %s, %s.') % (name, arch))
34 return
35
36
38 """take a package, check it out by trying to open it, return 1 if its good
39 return 0 if it's not"""
40 ts.sigChecking('md5')
41 fdno = os.open(package, os.O_RDONLY)
42 try:
43 ts.hdrFromFdno(fdno)
44 except rpm.error, e:
45 good = 0
46 else:
47 good = 1
48 os.close(fdno)
49 ts.sigChecking('default')
50
51 if urlgraberror:
52 if not good:
53 raise URLGrabError(-1, _('RPM %s fails md5 check') % (package))
54 else:
55 return
56 else:
57 return good
58
60 """ take a package, check it's sigs, return 0 if they are all fine, return
61 1 if the gpg key can't be found, 2 if the header is in someway damaged,
62 3 if the key is not trusted, 4 if the pkg is not gpg or pgp signed"""
63 ts.sigChecking('default')
64 fdno = os.open(package, os.O_RDONLY)
65 try:
66 hdr = ts.hdrFromFdno(fdno)
67 except rpm.error, e:
68 if str(e) == "public key not availaiable":
69 return 1
70 if str(e) == "public key not available":
71 return 1
72 if str(e) == "public key not trusted":
73 return 3
74 if str(e) == "error reading package header":
75 return 2
76 else:
77 error, siginfo = getSigInfo(hdr)
78 if error == 101:
79 os.close(fdno)
80 del hdr
81 return 4
82 else:
83 del hdr
84 os.close(fdno)
85 return 0
86
88 """checks if a computerhand back signature information and an error code"""
89 string = '%|DSAHEADER?{%{DSAHEADER:pgpsig}}:{%|RSAHEADER?{%{RSAHEADER:pgpsig}}:{%|SIGGPG?{%{SIGGPG:pgpsig}}:{%|SIGPGP?{%{SIGPGP:pgpsig}}:{(none)}|}|}|}|'
90 siginfo = hdr.sprintf(string)
91 if siginfo != '(none)':
92 error = 0
93 sigtype, sigdate, sigid = siginfo.split(',')
94 else:
95 error = 101
96 sigtype = 'MD5'
97 sigdate = 'None'
98 sigid = 'None'
99
100 infotuple = (sigtype, sigdate, sigid)
101 return error, infotuple
102
104 provnames = []
105 provides = header[rpm.RPMTAG_PROVIDENAME]
106 if provides is None:
107 pass
108 elif type(provides) is types.ListType:
109 provnames.extend(provides)
110 else:
111 provnames.append(provides)
112 return provnames
113
115
116
117
118 def rpmOutToStr(arg):
119 if type(arg) != types.StringType and arg != None:
120 arg = str(arg)
121 return arg
122 e1 = rpmOutToStr(e1)
123 v1 = rpmOutToStr(v1)
124 r1 = rpmOutToStr(r1)
125 e2 = rpmOutToStr(e2)
126 v2 = rpmOutToStr(v2)
127 r2 = rpmOutToStr(r2)
128 rc = rpm.labelCompare((e1, v1, r1), (e2, v2, r2))
129 log(6, '%s, %s, %s vs %s, %s, %s = %s' % (e1, v1, r1, e2, v2, r2, rc))
130 return rc
131
132
145
146
148 try:
149 db = rpm.TransactionSet(conf.installroot)
150 except rpm.error, e:
151 errorlog(0, _("Could not open RPM database for reading. Perhaps it is already in use?"))
152 return db
153
154
155
156 __all__ = ["GzipFile","open"]
157
160 self.fileobj.write('\037\213')
161 self.fileobj.write('\010')
162 fname = self.filename[:-3]
163 flags = 0
164 if fname:
165 flags = FNAME
166 self.fileobj.write(chr(flags))
167 write32u(self.fileobj, long(0))
168 self.fileobj.write('\002')
169 self.fileobj.write('\377')
170 if fname:
171 self.fileobj.write(fname + '\000')
172
173
174 -def _gzipOpen(filename, mode="rb", compresslevel=9):
175 return GzipFile(filename, mode, compresslevel)
176
180
182 if self.hdr is None:
183 errorlog(0, _('Got an empty Header, something has gone wrong'))
184
185 sys.exit(1)
186 return self.hdr[tag]
187
189 if self._getTag('sourcepackage') == 1:
190 return 1
191 else:
192 return 0
193
196
199
202
205
208
210 e = self._getTag('epoch')
211 v = self._getTag('version')
212 r = self._getTag('release')
213 return (e, v, r)
214
216 n = self._getTag('name')
217 e = self._getTag('epoch')
218 v = self._getTag('version')
219 r = self._getTag('release')
220 a = self._getTag('arch')
221 return (n, e, v, r, a)
222
224
225
226 (name, epoch, ver, rel, arch) = self.nevra()
227 if epoch is None:
228 epoch = '0'
229 if self.isSource():
230 headerfn = "%s/%s-%s-%s-%s.src.hdr" % (headerdir, name, epoch, ver, rel)
231 else:
232 headerfn = "%s/%s-%s-%s-%s.%s.hdr" % (headerdir, name, epoch, ver, rel, arch)
233
234 if compress:
235 headerout = _gzipOpen(headerfn, "w")
236 else:
237 headerout = open(headerfn, "w")
238 headerout.write(self.hdr.unload(1))
239 headerout.close()
240 return(headerfn)
241
243 """for operating on hdrs in and out of the rpmdb
244 if the first arg is a string then it's a filename
245 otherwise it's an rpm hdr"""
247 if type(header) is types.StringType:
248 try:
249 fd = gzip.open(header, 'r')
250 try:
251 h = rpm.headerLoad(fd.read())
252 except rpm.error, e:
253 errorlog(0,_('Damaged Header %s') % header)
254 h = None
255 except IOError,e:
256 fd = open(header, 'r')
257 try:
258 h = rpm.headerLoad(fd.read())
259 except rpm.error, e:
260 errorlog(0,_('Damaged Header %s') % header)
261 h = None
262 except ValueError, e:
263 errorlog(0,_('Damaged Header %s') % header)
264 h = None
265 except zlibError, e:
266 errorlog(0,_('Damaged Header %s') % header)
267 h = None
268 fd.close()
269 else:
270 h = header
271 self.hdr = h
272
273
276 ts.setVSFlags(~(rpm._RPMVSF_NOSIGNATURES))
277 fd = os.open(rpmfn, os.O_RDONLY)
278 try:
279 self.hdr = ts.hdrFromFdno(fd)
280 except rpm.error, e:
281 errorlog(0, _('Error opening rpm %s - error %s') % (rpmfn, e))
282 self.hdr = None
283 os.close(fd)
284
286 """This should operate on groups of headers/matches/etc in the rpmdb - ideally it will
287 operate with a list of the Base objects above, so I can refer to any one object there
288 not sure the best way to do this yet, more thinking involved"""
290 try:
291 if conf.installroot:
292 if conf.installroot != '/':
293 dbPath = conf.installroot
294 except NameError, e:
295 pass
296
297 self.ts = rpm.TransactionSet(dbPath)
298
299 self.methods = ['addInstall', 'addErase', 'run', 'check', 'order', 'hdrFromFdno',
300 'closeDB', 'dbMatch', 'setFlags', 'setVSFlags', 'setProbFilter']
301
303 if attribute in self.methods:
304 return getattr(self.ts, attribute)
305 else:
306 raise AttributeError, attribute
307
308 - def match(self, tag = None, search = None, mire = None):
309 """hands back a list of Header_Work objects"""
310 hwlist = []
311
312 if mire is None and tag is None and search is None:
313 hdrlist = self.ts.dbMatch()
314
315 else:
316
317 if mire == None:
318 hdrlist = self.ts.dbMatch(tag, search)
319 else:
320
321 if mire == 'glob':
322 hdrlist = self.ts.dbMatch()
323 hdrlist.pattern(tag, rpm.RPMMIRE_GLOB, search)
324 elif mire == 'regex':
325 hdrlist = self.ts.dbMatch()
326 hdrlist.pattern(tag, rpm.RPMMIRE_REGEX, search)
327 elif mire == 'strcmp':
328 hdrlist = self.ts.dbMatch()
329 hdrlist.pattern(tag, rpm.RPMMIRE_STRCMP, search)
330 else:
331 hdrlist = self.ts.dbMatch()
332 hdrlist.pattern(tag, rpm.RPMMIRE_DEFAULT, search)
333
334 for hdr in hdrlist:
335 hdrobj = Header_Work(hdr)
336 hwlist.append(hdrobj)
337 return hwlist
338
340 """pass type of check you want to occur, default is to have them off"""
341 if sig == 'md5':
342
343 self.ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD))
344 elif sig == 'none':
345
346 self.ts.setVSFlags(~(rpm._RPMVSF_NOSIGNATURES))
347 elif sig == 'default':
348
349 self.ts.setVSFlags(rpm.RPMVSF_DEFAULT)
350 else:
351 raise AttributeError, sig
352