35
36:- module(chat_store,
37 [ chat_store/1, 38 chat_messages/3 39 ]). 40:- use_module(library(settings)). 41:- use_module(library(filesex)). 42:- use_module(library(option)). 43:- use_module(library(sha)). 44:- use_module(library(apply)). 45:- use_module(library(http/http_dispatch)). 46:- use_module(library(http/http_parameters)). 47:- use_module(library(http/http_json)). 48
49:- http_handler(swish(chat/messages), chat_messages, [ id(chat_messages) ]). 50:- http_handler(swish(chat/status), chat_status, [ id(chat_status) ]). 51
52:- setting(directory, callable, data(chat),
53 'The directory for storing chat messages.').
58:- multifile
59 swish_config:chat_count_about/2. 60
61:- listen(http(pre_server_start),
62 open_chatstore). 63
64:- dynamic storage_dir/1. 65:- volatile storage_dir/1. 66
67open_chatstore :-
68 storage_dir(_),
69 !.
70open_chatstore :-
71 with_mutex(chat_store, open_chatstore_guarded).
72
73open_chatstore_guarded :-
74 storage_dir(_),
75 !.
76open_chatstore_guarded :-
77 setting(directory, Spec),
78 absolute_file_name(Spec, Dir,
79 [ file_type(directory),
80 access(write),
81 file_errors(fail)
82 ]), !,
83 asserta(storage_dir(Dir)).
84open_chatstore_guarded :-
85 setting(directory, Spec),
86 absolute_file_name(Spec, Dir,
87 [ solutions(all)
88 ]),
89 \+ exists_directory(Dir),
90 catch(make_directory(Dir),
91 error(permission_error(create, directory, Dir), _),
92 fail), !,
93 asserta(storage_dir(Dir)).
99chat_dir_file(DocID, Path, File) :-
100 open_chatstore,
101 sha_hash(DocID, Bin, []),
102 hash_atom(Bin, Hash),
103 sub_atom(Hash, 0, 2, _, D1),
104 sub_atom(Hash, 2, 2, _, D2),
105 sub_atom(Hash, 4, _, 0, Name),
106 storage_dir(Dir),
107 atomic_list_concat([Dir, D1, D2], /, Path),
108 atomic_list_concat([Path, Name], /, File).
115existing_chat_file(DocID, File) :-
116 chat_dir_file(DocID, _, File),
117 exists_file(File).
126chat_store(Message) :-
127 chat{docid:DocIDS} :< Message,
128 atom_string(DocID, DocIDS),
129 chat_dir_file(DocID, Dir, File),
130 ( del_dict(create, Message, false, Message1)
131 -> exists_file(File)
132 ; Message1 = Message
133 ),
134 !,
135 make_directory_path(Dir),
136 strip_chat(Message1, Message2),
137 with_mutex(chat_store,
138 ( setup_call_cleanup(
139 open(File, append, Out, [encoding(utf8)]),
140 format(Out, '~q.~n', [Message2]),
141 close(Out)),
142 increment_message_count(DocID)
143 )).
144chat_store(_).
151strip_chat(Message0, Message) :-
152 strip_chat_user(Message0.get(user), User),
153 !,
154 Message = Message0.put(user, User).
155strip_chat(Message, Message).
156
157strip_chat_user(User0, User) :-
158 del_dict(wsid, User0, _, User),
159 !.
160strip_chat_user(User, User).
172chat_messages(DocID, Messages, Options) :-
173 ( existing_chat_file(DocID, File)
174 -> read_messages(File, Messages0, Options),
175 filter_old(Messages0, Messages, Options)
176 ; Messages = []
177 ).
178
179read_messages(File, Messages, Options) :-
180 setup_call_cleanup(
181 open(File, read, In, [encoding(utf8)]),
182 read_messages_from_stream(In, Messages, Options),
183 close(In)).
184
185read_messages_from_stream(In, Messages, Options) :-
186 option(max(Max), Options, 25),
187 integer(Max),
188 setup_call_cleanup(
189 set_stream(In, encoding(octet)),
190 ( seek(In, 0, eof, _Pos),
191 backskip_lines(In, Max)
192 ),
193 set_stream(In, encoding(utf8))),
194 !,
195 read_terms(In, Messages).
196read_messages_from_stream(In, Messages, _Options) :-
197 seek(In, 0, bof, _NewPos),
198 read_terms(In, Messages).
199
200read_terms(In, Terms) :-
201 read_term(In, H, []),
202 ( H == end_of_file
203 -> Terms = []
204 ; Terms = [H|T],
205 read_terms(In, T)
206 ).
207
208backskip_lines(Stream, Lines) :-
209 byte_count(Stream, Here),
210 between(10, 20, X),
211 Start is max(0, Here-(1<<X)),
212 seek(Stream, Start, bof, _NewPos),
213 skip(Stream, 0'\n),
214 line_starts(Stream, Here, Starts),
215 reverse(Starts, RStarts),
216 nth1(Lines, RStarts, LStart),
217 !,
218 seek(Stream, LStart, bof, _).
219
220line_starts(Stream, To, Starts) :-
221 byte_count(Stream, Here),
222 ( Here >= To
223 -> Starts = []
224 ; Starts = [Here|T],
225 skip(Stream, 0'\n),
226 line_starts(Stream, To, T)
227 ).
228
229filter_old(Messages0, Messages, Options) :-
230 option(after(After), Options),
231 After > 0,
232 !,
233 include(after(After), Messages0, Messages).
234filter_old(Messages, Messages, _).
235
236after(After, Message) :-
237 is_dict(Message),
238 Message.get(time) > After.
245:- dynamic message_count/2. 246:- volatile message_count/2. 247
248chat_message_count(DocID, Count) :-
249 message_count(DocID, Count),
250 !.
251chat_message_count(DocID, Count) :-
252 count_messages(DocID, Count),
253 asserta(message_count(DocID, Count)).
254
255count_messages(DocID, Count) :-
256 ( existing_chat_file(DocID, File)
257 -> setup_call_cleanup(
258 open(File, read, In, [encoding(iso_latin_1)]),
259 ( skip(In, 256),
260 line_count(In, Line)
261 ),
262 close(In)),
263 Count is Line - 1
264 ; Count = 0
265 ).
266
267increment_message_count(DocID) :-
268 clause(message_count(DocID, Count0), _, CRef),
269 !,
270 Count is Count0+1,
271 asserta(message_count(DocID, Count)),
272 erase(CRef).
273increment_message_count(_).
279swish_config:chat_count_about(DocID, Count) :-
280 chat_message_count(DocID, Count).
281
282
283
291chat_messages(Request) :-
292 http_parameters(Request,
293 [ docid(DocID, []),
294 max(Max, [nonneg, optional(true)]),
295 after(After, [number, optional(true)])
296 ]),
297 include(ground, [max(Max), after(After)], Options),
298 chat_messages(DocID, Messages, Options),
299 reply_json_dict(Messages).
305chat_status(Request) :-
306 http_parameters(Request,
307 [ docid(DocID, []),
308 max(Max, [nonneg, optional(true)]),
309 after(After, [number, optional(true)])
310 ]),
311 include(ground, [max(Max), after(After)], Options),
312 chat_message_count(DocID, Total),
313 ( Options == []
314 -> Count = Total
315 ; chat_messages(DocID, Messages, Options),
316 length(Messages, Count)
317 ),
318 reply_json_dict(
319 json{docid: DocID,
320 total: Total,
321 count: Count
322 })
Store chat messages
*/