from SimPy.Simulation import *
from random import Random,expovariate,uniform
import time,os,math,difflib
from pysqlite2 import dbapi2 as sql
TABLE_TEST_CREATE = """CREATE TABLE tests
(id integer PRIMARY KEY,
type varchar(20),
period integer,
sleep float,
pc float,
feed_count integer)
"""
TABLE_DATA_CREATE = """CREATE TABLE data
(id integer PRIMARY KEY,
test integer,
interface varchar(20),
sent integer,
recv integer,
wakeups integer,
failures integer
)
"""
feeds={}
feeds['Slashdot']="http://rss.slashdot.org/Slashdot/slashdot";
feeds['Engadget']="http://www.engadget.com/rss.xml";
feeds['CNN Top']="http://rss.cnn.com/rss/cnn_topstories.rss";
feeds['CNN Tech']="http://rss.cnn.com/rss/cnn_tech.rss";
feeds['del.icio.us']="http://del.icio.us/rss/jblebrun";
feeds['BoingBoing']="http://feeds.feedburner.com/boingboing/iBag";
feeds['Scripting']="http://www.scripting.com/rss.xml";
feeds['Wired Top']="http://www.wired.com/news_drop/netcenter/netcenter.rdf";
feeds['Guardian']="http://www.guardian.co.uk/rss/1,,,00.xml";
feeds['SciAm']="http://xml.newsisfree.com/feeds/39/1439.xml";
feeds['kuro5hin']="http://www.kuro5hin.org/backend.rdf";
feeds['NYT']="http://partners.userland.com/nytRss/technology.xml";
feeds['Salon']="http://www.salon.com/feed/RDF/salon_use.rdf";
feeds['WorkBench']="http://www.cadenhead.org/workbench/rss.xml";
feeds['CNET']="http://news.com.com/2547-1_3-0-5.xml";
basedir = "/home/jblebrun/mifeeds/"
basetime = time.mktime((2006,12,31,2,00,00,-1,-1,-1))
endtime = time.mktime((2007,1,1,2,00,00,-1,-1,-1))
runtime = endtime-basetime
def mytime(ctime):
return time.strftime("%Y-%m-%d_%H:%M:%S",time.gmtime(ctime))
class Request:
def __init__(self, name, body=[], last_modified=None):
self.name=name
self.body=body
self.last_modified=last_modified
class Response:
def __init__(self, name, code, body=[]):
self.name=name
self.code=code
self.body=body
class Interface(Process):
""" This is a network interface
It has a parameters:
*probability of connection available: Pc
*probability of being asleep: Ps
It keeps track of:
*packets received (including control)
*packets sent (including control)
"""
def __init__(self):
Process.__init__(self)
self.msgReceived=SimEvent("msgReceived")
self.msgSent=SimEvent("msgSent")
self.connectionFailed=SimEvent("connectionFailed")
self.mSent = Tally(name="Sent")
self.mRecv = Tally(name="Received")
self.mWakeUps = Tally(name="WakeUps")
self.mFailures = Tally(name="Failures")
def listen(self,psleep,pconnected,pidle):
yield hold,self,1
self.psleep=psleep
self.pconnected=pconnected
self.pidle=pidle
while True:
yield waitevent, self, (self.msgSent, self.msgReceived)
for ev in self.eventsFired:
bodylen = len(ev.signalparam.body)
if ev.name=="msgSent":
#ACKs
self.mRecv.observe(math.ceil(float(bodylen)/1500)*66)
self.mSent.observe(bodylen)
if ev.name=="msgReceived":
self.mRecv.observe(bodylen)
self.mSent.observe(math.ceil(float(bodylen)/1500)*66)
#print "Sent:",sent,"Received:",received
#Wait for send signals, decide if asleep or not
#If asleep, try to connect
#If connection fails, return failure
def report(self):
print "Sent:",self.mSent.total()
print "Recv:",self.mRecv.total()
def data(self):
return self.mSent.total(), self.mRecv.total(), self.mWakeUps.total(), self.mFailures.total()
def send(self, msg):
#First, decide if the interface was asleep. If so,
#increment the "wake up" count.
if uniform(0, 1) < self.psleep:
self.mWakeUps.observe(1)
#Next, decide if the connection is available. If not,
#return a connection failed signal, and increment.
if uniform(0,1) > self.pconnected:
self.mFailures.observe(1)
self.connectionFailed.signal()
else:
self.msgSent.signal(msg)
def respond(self, msg):
self.msgReceived.signal(msg)
class Internet(Process):
""" This is the interface that figures out
which feed to return to the requester
It just knows about feeds. It returns them based on keywords."""
def serve(self, iface):
while True:
yield waitevent, self, iface.msgSent
request=iface.msgSent.signalparam
yield hold, self, 0
path = request.name
ctime = now()+basetime
ims = mytime(request.last_modified)
clienttime=mytime(ctime)
name="%s-%s"%(clienttime,path)
mod_name="%s-%s"%(ims,path)
#Read all filenames ending with path
feeds=os.listdir(basedir)
filtered=[]
i=0
for item in feeds:
i+=1
if item[-len(path):]==path and item <= name:
filtered.append(item)
try:
latest=max(filtered)
if latest<mod_name:
#print "Not modified"
response=Response(name, "not modified",[])
else:
#print "Server Returning",basedir+latest
f=open(basedir+latest,'r')
body=f.read()
f.close()
response=Response(name, "OK", body)
except:
#print "Error"
response=Response(name, "Error", [])
latest=None
iface.respond(response)
class Client(Process):
"""This class represents an OCMP client in the caes which use a proxy."""
def listen(self, iface, oob):
while True:
yield hold, self, 0
yield waitevent, self, oob.msgSent, iface.msgReceived
for ev in self.eventsFired:
if ev.name=="msgSent":
iface.send(Request("connect"))
if ev.name=="msgReceived":
pass
#print "received data"
class PollTimer(Process):
"""Maintains polling timers for processes that monitor feeds.
It's a separate class so that the other processes can yield on
a polling timer and msg signals simultaneously"""
def __init__(self, period):
Process.__init__(self)
self.period=period
self.signal=SimEvent("timer")
def go(self):
while True:
yield hold, self, self.period
self.signal.signal()
class Proxy(Process):
"""This is the remote proxy
Receives signals: timer, client request
Send signals: oob data available"""
def fetch(self, period, feeds, net_iface, client_iface=None, client_oob=None, use_diff=False):
timer=PollTimer(period)
activate(timer, timer.go())
last_feed={}
if client_iface:
client_signal=client_iface.msgSent
else:
client_signal=SimEvent("dummy");
while True:
yield waitevent, self, (timer.signal, client_signal)
for ev in self.eventsFired:
#print ev.name
if ev.name=="timer":
#print "Fetching",mytime(basetime+now())
ok=0
self.queue=""
for item in feeds:
net_iface.send(Request(name=item,body=item,last_modified=basetime))
#Receive:
yield waitevent, self, (net_iface.msgReceived, net_iface.connectionFailed)
for ev in self.eventsFired:
if ev.name=="msgReceived":
#print "App got response",interface.msgReceived.signalparam.code
code=net_iface.msgReceived.signalparam.code
body=net_iface.msgReceived.signalparam.body
if code=="OK":
if not last_feed.has_key(item):
last_feed[item]=""
if body != last_feed[item]:
ok+=1
if use_diff:
diff="".join(difflib.unified_diff(body.splitlines(1), last_feed[item].splitlines(1)))
self.queue += diff
else:
self.queue += body
last_feed[item]=body
#Make diff
#Queue difference
elif code=="not modified":
pass
else:
print "Not ok:",code
yield hold, self, 0
if ev.name=="connectionFailed":
#print "Connection failed"
pass
if ok > 0 and client_oob:
#print "Notifying about",ok,"new feeds"
client_oob.send(Request(name="notification",body=range(0,160)))
elif ev.name=="msgSent":
if not client_iface:
print "Uh oh, shouldn't be here!"
#send stuff in queue
#print "Sending to client",len(self.queue)
client_iface.respond(Response("stuff",code=10,body=self.queue))
pass
def test(period, sleep, pc, feed_count):
global feeds
initialize()
i=Interface()
h=Internet()
p=Proxy()
myfeeds={}
myfeeds.update(feeds.items()[0:feed_count])
print "Check %s feeds\n"%len(myfeeds.keys())
activate(i,i.listen(sleep,pc,0))
activate(h,h.serve(iface=i))
activate(p,p.fetch(period=60*period, feeds=myfeeds, net_iface=i))
simulate(until=runtime)
sql="INSERT INTO tests VALUES(NULL, 'no_proxy', %s, %s, %s, %s)"%(period, sleep, pc, feed_count)
print sql
cur.execute(sql)
test=cur.lastrowid
print "Test is:",test
print "Tuple:",(test,)+i.data()
sql="INSERT INTO data VALUES(NULL, %s, 'net', %s, %s, %s, %s)"%((test,)+i.data())
print sql
cur.execute(sql)
def ocmp_test(period, sleep, pc, feed_count, use_diff=False):
global feeds
initialize()
i=Interface()
ih=Interface()
o=Interface()
h=Internet()
c=Client()
p=Proxy()
myfeeds={}
myfeeds.update(feeds.items()[0:feed_count])
print "Check %s feeds\n"%len(myfeeds.keys())
activate(i,i.listen(sleep,pc,0))
activate(ih,ih.listen(0,1,0))
activate(o,o.listen(0,1,0))
activate(h,h.serve(iface=ih))
activate(c,c.listen(iface=i, oob=o))
activate(p,p.fetch(period=60*period, feeds=myfeeds, net_iface=ih, client_iface=i ,client_oob=o, use_diff=use_diff))
simulate(until=runtime)
if use_diff:
type='proxy-diff'
else:
type='proxy'
sql="INSERT INTO tests VALUES(NULL, '%s', %s, %s, %s, %s)"%(type, period, sleep, pc, feed_count)
print sql
cur.execute(sql)
test=cur.lastrowid
print "Test is:",test
print "Tuple:",(test,)+i.data()
sql="INSERT INTO data VALUES(NULL, %s, 'net', %s, %s, %s, %s)"%((test,)+ih.data())
print sql
cur.execute(sql)
sql="INSERT INTO data VALUES(NULL, %s, 'wireless', %s, %s, %s, %s)"%((test,)+ i.data())
print sql
cur.execute(sql)
sql="INSERT INTO data VALUES(NULL, %s, 'oob', %s, %s, %s, %s)"%((test,)+ o.data())
print sql
cur.execute(sql)
def first_test():
#####################################
#First case: no proxy, one interface#
#####################################
#Try 10,30,60,360 minutes
#Vary P(sleep) from 0%-100% (0,25,50,100)
#1 feed, all feeds
#Different Idle terms? 0, 50, 100
#Different Pc: 25, 50, 100
#96 runs
for period in [360, 180, 60, 30, 10]:
for sleep in [0, .25, .50, .75, 1]:
for pc in [0, .25, .50, .75, 1]:
for feed_count in [1, 5, 10]:
print "Test for",period,sleep,pc,feed_count
test(period, sleep, pc, feed_count)
ocmp_test(period, sleep, pc, feed_count)
ocmp_test(period, sleep, pc, feed_count, use_diff=True)
if __name__=="__main__":
#Each of the tests will try various scenarios as specified below.
#The tests will record the number of bytes sent and received.
#Finally, an energy metric will be calculated based
#on the formula in the paper.
global cur
con=sql.connect("ocmp.db")
cur=con.cursor()
try:
cur.execute(TABLE_TEST_CREATE)
except Exception, e:
print "Couldn't create test table:"
print e
try:
cur.execute(TABLE_DATA_CREATE)
except Exception, e:
print "Couldn't create data table:"
print e
first_test()
#ocmp_test(10)
con.commit()