1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
2: { 3: private IntervalEvent<SyndicationEvent> pendingevent;
4: private DateTime pendingctitime;
5:
6: SyndicationFeed dataFeed;
7: IEnumerator<SyndicationItem> dataFeedItems;
8:
9: int index = 0;
10: /// <summary>
11: /// Constructor - Use this to initialize local resources based on configInfo
12: /// structures. Examples are open files or database connections, set delimiters, and
13: /// other parameters that enable the adapter to access the input/output device.
14: /// </summary>
15: public RSS20InputAdapter(SyndicationAdapterConfig configInfo, CepEventType cepEventType)
16: { 17: Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter(typeof(SyndicationFeed));
18: XmlReader rssReader = XmlReader.Create(configInfo.URL);
19: rssFormatter.ReadFrom(rssReader);
20: rssReader.Close();
21: dataFeed = rssFormatter.Feed;
22: dataFeedItems= dataFeed.Items.GetEnumerator();
23: }
24:
25: /// <summary>
26: /// Start is the first method to be called by the CEP server once the input
27: /// adapter has been instantiated. So any initializations that cannot be covered
28: /// in the constructor can be placed here. Start() is called in a separate worker
29: /// thread initiated by the server. In this example (and in most scenarios), Start
30: /// begins producing events once it is called.
31: /// </summary>
32: public override void Start()
33: { 34: ProduceEvents();
35: }
36:
37: /// <summary>
38: /// Resume is called by the server once it returns from being scheduled away
39: /// from Start, and only after the adapter has called Ready(). Resume continues
40: /// to produce events.
41: /// </summary>
42: public override void Resume()
43: { 44:
45: ProduceEvents();
46: }
47:
48:
49:
50: /// <summary>
51: /// Dispose is inherited from the base adapter class and is the placeholder to
52: /// release adapter resources when this instance is shut down.
53: /// </summary>
54: /// <param name="disposing"></param>
55: protected override void Dispose(bool disposing)
56: { 57: }
58:
59: /// <summary>
60: /// Main driver to read events from the source and enqueue them.
61: /// </summary>
62: private void ProduceEvents()
63: { 64: IntervalEvent<SyndicationEvent> currevent = default(IntervalEvent<SyndicationEvent>);
65: DateTime currctitime = default(DateTime);
66: EnqueueOperationResult result = EnqueueOperationResult.Full;
67:
68: while (true)
69: { 70: if (AdapterState.Stopping == AdapterState)
71: { 72: this.Stopped();
73: return;
74: }
75:
76: currevent = CreateEventFromSource();
77: pendingevent = null;
78:
79: // Enqueue point event with payload.
80: if (null != currevent)
81: result = Enqueue(ref currevent);
82: else
83: { 84: result = EnqueueOperationResult.Full;
85: PrepareToStop(currevent);
86: // Stopped();
87: }
88:
89: // Handle Enqueue rejection
90: if (EnqueueOperationResult.Full == result)
91: { 92: EnqueueCtiEvent(DateTime.MaxValue);
93: PrepareToStop(currevent);
94: Stopped();
95: return;
96:
97: }
98:
99: // Enqueue Cti event based on application logic
100: result = EnqueueCtiEvent(currctitime);
101:
102: }
103: }
104:
105: private void PrepareToStop(IntervalEvent<SyndicationEvent> currEvent)
106: { 107: // The server will not accept any more events, and you
108: // cannot do anything about it. Release the event.
109: // If you miss this step, server memory will leak.
110: if (null != currEvent)
111: { 112: ReleaseEvent(ref currEvent);
113: }
114: }
115:
116: private void PrepareToResume(IntervalEvent<SyndicationEvent> currevent)
117: { 118: pendingevent = currevent;
119: }
120:
121: private void PrepareToResume(DateTime currctitime)
122: { 123: pendingctitime = currctitime;
124: }
125: private bool EndofSource()
126: { 127: return false;
128: }
129: private IntervalEvent<SyndicationEvent> CreateEventFromSource()
130: { 131: if (dataFeedItems.MoveNext())
132: { 133: IntervalEvent<SyndicationEvent> syndicationEvent = this.CreateInsertEvent();
134: syndicationEvent.Payload = new SyndicationEvent(dataFeedItems.Current);
135: syndicationEvent.StartTime = dataFeedItems.Current.PublishDate.DateTime;
136: syndicationEvent.EndTime = dataFeedItems.Current.PublishDate.DateTime;
137: return syndicationEvent;
138: }
139: else
140: return null;
141: }
142: }
143:
144: // Configuration structure to initialize the adapter.
145: public struct SyndicationAdapterConfig
146: { 147: private string url;
148:
149: public string URL
150: { 151: get { return url; } 152: set { url = value; } 153: }
154: }
155:
156: // Factory class is the entry point for the query binder to initialize
157: // and create an adapter instance.
158: public class SyndicationAdapterFactory : IInputAdapterFactory<SyndicationAdapterConfig>
159: { 160: public SyndicationAdapterFactory()
161: { 162: }
163:
164: public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
165: EventShape eventshape, CepEventType cepeventtype)
166: { 167: InputAdapterBase adapter = default(InputAdapterBase);
168:
169: if (EventShape.Interval == eventshape)
170: adapter = new RSS20InputAdapter(configInfo, cepeventtype);
171:
172: return adapter;
173: }
174: public void Dispose()
175: { 176: }
177: }