""" Eithon Cadag, University of Washington, 2009 fuzzy.py is released under GPL and as-is! """ ## temp class needs work class FuzzyRule: def __init__(self, predicate, expression): self.predicate = predicate self.expression = expression def __call__(self, *args): return self.eval(args[0]) def eval(self, value): tval = self.expression(value) return self.predicate.cut(tval) ## temp class needs work class FuzzyVariable: def __init__(self, name): self.name = name self.curves = {} def add_curve(self, curve, name): assert isinstance(curve,Curve) self.curves[name] = curve def membership(self, value): result = dict(self.curves) for key in self.curves: result[key] = self.curves[key](value) return result class FuzzyExpr: def __init__(self, disc): self.disc = disc def __call__(self,*args): return self.disc(*args) def __and__(self, other): def and_(val): return self.disc(val) & other.disc(val) return and_ def __or__(self, other): def or_(val): return self.disc(val) | other.disc(val) return or_ def __invert__(self): def inv_(val): return (~self.disc)(val) return inv_ #updated class Point: def __init__(self, x, y): ## HACK Alert: find out why stuff getting passed here is sometimes > 1 and < 0 if y > 1: y = 1 elif y < 0: y = 0 ## END HACK if y > 1.0: raise ValueError, "Vertical domain of values is restricted between 0.0 and 1.0 inclusive." self.x = round(float(x),2) self.y = round(float(y),2) def __eq__(self,other): return self.x == other.x and self.y == other.y def __repr__(self): return "(Point %s %s)" % (self.x, self.y) def __gt__(self,other): return self.x > other.x def __lt__(self,other): return self.x < other.x def __ge__(self,other): return self.x >= other.x def __le__(self,other): return self.x <= other.x def __invert__(self): return Point(self.x, 1-self.y) class FuzzyValue: def __init__(self, y): assert y <= 1.0 and y >= 0.0 self.value = y def __gt__(self, other): return self.value > other.value def __lt__(self, other): return self.value < other.value def __le__(self, other): return self.value <= other.value def __ge__(self, other): return self.value >= other.value def __invert__(self): return FuzzyValue(1.0-self.value) def __add__(self, other): return FuzzyValue(self.value + other.value) def __sub__(self, other): return FuzzyValue(self.value - other.value) def __div__(self, other): return FuzzyValue(self.value / other.value) def __mul__(self, other): return FuzzyValue(self.value * other.value) def __and__(self, other): if self.value > other.value: return other else: return self def __or__(self, other): if self.value < other.value: return other else: return self def __repr__(self): return "<" + str(self.value) + ">" #updated class Line: def __init__(self,start,stop): try: assert start.x < stop.x except AssertionError: ## HACK Alert - this class is getting passed startx. == stop.x from Curve start.x = start.x - .01 ## END HACK self.start = start self.stop = stop self.slope = calc_slope(start, stop) self.intercept = calc_intercept(start,stop) def y_intercept(self): return self.intercept def x_bounded(self, val): return val >= self.start.x and val <= self.stop.x def y_bounded(self, val): return val >= self.start.y and val <= self.stop.y or val <= self.start.y and val >= self.stop.y def xy_bounded(self, val): return self.x_bounded(val.x) and self.y_bounded(val.y) def get_slope(self): return self.slope def __call__(self, *args): result = [] for x in args: if self.x_bounded(x): result.append(FuzzyValue(round(float(self.slope * x + self.intercept),2))) if len(args) == 1: return result else: return tuple(result) def __repr__(self): return "(Line %s %s)" % (self.start, self.stop) def __eq__(self, other): return other.start == self.start and other.stop == self.stop def __invert__(self): return Line(~self.start, ~self.stop) class Curve: def __init__(self, *lines): self.lines = [lines[0]] post = lines[0] for x in range(1,len(lines)): if lines[x].start >= post.stop: self.lines.append(lines[x]) post = lines[x] else: raise ValueError self.centroid = self._centroid() def cut(self,y): assert isinstance(y,FuzzyValue), "Curve.cut() method takes a FuzzyValue as input" cutter = Curve(Line(Point(self.lines[0].start.x,y.value),Point(self.lines[-1].stop.x,y.value))) return self & cutter def defuzz(self): return self.centroid def __call__(self, *args): res = [] for x in args: for ln in self.lines: if ln.x_bounded(x): res.append(ln(x)[0]) break if len(args) == 1 and res != []: return res[0] elif res == []: return 0 else: return tuple(res) def _centroid(self): start = self.lines[0].start.x end = self.lines[-1].stop.x curve_range = end - start pos = 0 incr = start adder = curve_range / (100 + 10.0 * len(self.lines)) # ~10 segment cuts per line in the curve numer, denom = 0,0 while incr < end: y_val = self(incr).value numer += y_val * incr * adder denom += y_val * adder incr += adder try: return round(float(numer / denom),2) except ZeroDivisionError: return 0 def _make_pt_list(self, other, fnc): ## make all lines graphable all_lines = self.lines + other.lines cleared = [] while all_lines != []: current = all_lines[0] for x in range(len(all_lines)): if all_lines[x] == current: continue intersect = get_intersect(current, all_lines[x]) if intersect: result = intersection_splitter(all_lines[x], current) all_lines.extend(result) del all_lines[x] del all_lines[0] break if current in all_lines: cleared.append(current) del all_lines[0] ## build graph ln_graph = {} if fnc(self.lines[0].start, other.lines[0].start): head = self.lines[0].start else: head = other.lines[0].start for x in cleared: try: ln_graph[str(x.start)].append(x.stop) except KeyError: ln_graph[str(x.start)] = [x.stop] return ln_graph, head def __and__(self, other): def and_op(me,them): if me.y < them.y: return True return False ln_graph, head = self._make_pt_list(other, and_op) accumulator = [head] self._traverse(ln_graph, str(head), accumulator, and_op) return curve(*accumulator) def __or__(self, other): def or_op(me,them): if me.y > them.y: return True return False ln_graph, head = self._make_pt_list(other, or_op) accumulator = [head] self._traverse(ln_graph, str(head), accumulator, or_op) return curve(*accumulator) def __invert__(self): invc = [~l for l in self.lines] return Curve(*invc) def _traverse(self, graph, head, accumulator, fnc): try: graph[head] except KeyError: return top = None for g in graph[head]: if not top or fnc(g,top): top = g accumulator.append(top) self._traverse(graph, str(top), accumulator, fnc) def __repr__(self): ln = ' '.join([str(x) for x in self.lines]) ln = ln[:-1] return "(Curve %s)" % (ln) def intersection_splitter(line_a, line_b): intersect = get_intersect(line_a, line_b) if line_a != line_b and line_a.get_slope() != line_b.get_slope(): # print line_a, line_b, intersect return Line(line_a.start, intersect), Line(line_b.start, intersect), \ Line(intersect, line_a.stop), Line(intersect, line_b.stop) else: return None def fexpr(curve): return FuzzyExpr(curve) def fv(val): return FuzzyValue(val) def fr(pred, expre): return FuzzyRule(pred, expre) def curve(*pts): post = pts[0] lines = [] for x in range(1,len(pts)): lines.append(Line(post, pts[x])) post = pts[x] return Curve(*lines) def and_operation(line_a, line_b): intersect = get_intersect(line_a, line_b) if intersect and line_a.y_intercept() > line_b.y_intercept(): begin = line_b.start end = line_a.stop elif intersect and line_a.y_intercept() < line_b.y_intercept(): begin = line_a.start end = line_b.stop else: return None return Curve(Line(begin,intersect),Line(intersect,end)) def or_operation(line_a, line_b): intersect = get_intersect(line_a, line_b) if intersect and line_a.y_intercept() < line_b.y_intercept(): begin = line_b.start end = line_a.stop elif intersect and line_a.y_intercept() > line_b.y_intercept(): begin = line_a.start end = line_b.stop else: return None return Curve(Line(begin,intersect),Line(intersect,end)) def get_intersect(line_a, line_b): if line_a == line_b or line_a.get_slope() == line_b.get_slope() or \ line_a.stop == line_b.start or line_b.stop == line_a.start or \ line_a.start == line_b.start or line_b.stop == line_a.stop: return None coeff = line_b.y_intercept() - line_a.y_intercept() slope = line_a.get_slope() - line_b.get_slope() x_int = coeff / slope y_int = line_a.get_slope() * x_int + line_a.y_intercept() point = Point(x_int, y_int) if line_a.xy_bounded(point) and line_b.xy_bounded(point): return point else: return None #updated def calc_intercept(pt_a,pt_b): slope = calc_slope(pt_a,pt_b) return pt_a.y - slope * pt_a.x #updated def calc_slope(pt_a,pt_b): if pt_b.x - pt_a.x != 0: return (pt_b.y - pt_a.y) / (pt_b.x - pt_a.x) return 0 if __name__ == '__main__': # expression definitions heavyInhibition = fexpr(curve(Point(-1,1),Point(-.9,1),Point(-.6,0),Point(1,0))) moderateInhibition = fexpr(curve(Point(-1,0),Point(-.9,0),Point(-.6,1),Point(-.3,0),Point(1,0))) lightInhibition = fexpr(curve(Point(-1,0),Point(-.6,0), Point(-.3,1), Point(0,0),Point(1,0))) normalActivity = fexpr(curve(Point(-1,0),Point(-.3,0),Point(0,1),Point(.3,0),Point(1,0))) lightInduction = fexpr(curve(Point(-1,0),Point(0,0), Point(.3,1), Point(.6,0),Point(1,0))) moderateInduction = fexpr(curve(Point(-1,0),Point(.3,0),Point(.6,1),Point(.9,0),Point(1,0))) heavyInduction = fexpr(curve(Point(-1,0),Point(.6,0),Point(.9,1),Point(1,1))) # curve definitions inhibitedp = curve(Point(-1,1),Point(-.6,1),Point(0,0),Point(1,0)) normalp = curve(Point(-1,0),Point(-.4,0),Point(0,1),Point(.4,0),Point(1,0)) inducedp = curve(Point(-1,0),Point(0,0),Point(.6,1),Point(1,1)) # pathway definition pathway = FuzzyVariable("PATHWAY") pathway.add_curve(inhibitedp, "inhibited") pathway.add_curve(normalp, "normal") pathway.add_curve(inducedp, "induced") # rule definitions inhibition_concern = fr(inhibitedp, heavyInhibition & moderateInhibition) slight_inhibition = fr(normalp, normalActivity & lightInhibition) # example scenario curstatus = inhibition_concern(-.65) & slight_inhibition(-.2) print curstatus.defuzz() print pathway.membership(curstatus.defuzz())