2 # Unix SMB/CIFS implementation.
3 # Copyright Ralph Boehme <slow@samba.org> 2019
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Tests for samba.dcerpc.mdssvc"""
26 from http
.server
import HTTPServer
, BaseHTTPRequestHandler
27 from samba
.dcerpc
import mdssvc
28 from samba
.tests
import RpcInterfaceTestCase
29 from samba
.samba3
import mdscli
30 from samba
.logger
import get_samba_logger
32 logger
= get_samba_logger(name
=__name__
)
48 class MdssvcHTTPRequestHandler(BaseHTTPRequestHandler
):
50 content_length
= int(self
.headers
['content-length'])
51 body
= self
.rfile
.read(content_length
)
53 actual_json
= json
.loads((body
))
54 expected_json
= json
.loads(self
.server
.json_in
)
56 if actual_json
!= expected_json
:
57 logger
.error("Bad request, expected:\n%s\nGot:\n%s\n" % (expected_json
, actual_json
))
62 (expected_json
, actual_json
))
65 resp
= bytes(self
.server
.json_out
, encoding
="utf-8")
67 self
.send_response(200)
68 self
.send_header('content-type', 'application/json; charset=UTF-8')
69 self
.send_header('content-length', len(resp
))
71 self
.wfile
.write(resp
)
73 class MdssvcTests(RpcInterfaceTestCase
):
76 super(MdssvcTests
, self
).setUp()
78 self
.pipe
= mdssvc
.mdssvc('ncacn_np:fileserver[/pipe/mdssvc]', self
.get_loadparm())
80 self
.server
= HTTPServer(('10.53.57.35', 8080),
81 MdssvcHTTPRequestHandler
,
82 bind_and_activate
=False)
84 self
.t
= threading
.Thread(target
=MdssvcTests
.http_server
, args
=(self
,))
85 self
.t
.setDaemon(True)
87 self
.sharepath
= os
.environ
["LOCAL_PATH"]
90 conn
= mdscli
.conn(self
.pipe
, 'spotlight', '/foo')
91 self
.fakepath
= conn
.sharepath()
92 conn
.disconnect(self
.pipe
)
94 for file in testfiles
:
95 f
= open("%s/%s" % (self
.sharepath
, file), "w")
99 super(RpcInterfaceTestCase
, self
).tearDown()
100 for file in testfiles
:
101 os
.remove("%s/%s" % (self
.sharepath
, file))
103 def http_server(self
):
104 self
.server
.server_bind()
105 self
.server
.server_activate()
106 self
.server
.serve_forever()
108 def run_test(self
, query
, expect
, json_in
, json_out
):
109 self
.server
.json_in
= json_in
.replace("%BASEPATH%", self
.sharepath
)
110 self
.server
.json_out
= json_out
.replace("%BASEPATH%", self
.sharepath
)
112 self
.conn
= mdscli
.conn(self
.pipe
, 'spotlight', '/foo')
113 search
= self
.conn
.search(self
.pipe
, query
, self
.fakepath
)
115 # Give it some time, the get_results() below returns immediately
116 # what's available, so if we ask to soon, we might get back no results
117 # as the server is still processing the request
120 results
= search
.get_results(self
.pipe
)
121 self
.assertEqual(results
, expect
)
123 search
.close(self
.pipe
)
124 self
.conn
.disconnect(self
.pipe
)
126 def test_mdscli_search(self
):
127 exp_json_query
= r
'''{
128 "from": 0, "size": 100, "_source": ["path.real"],
131 "query": "(samba*) AND path.real.fulltext:\"%BASEPATH%\""
135 fake_json_response
= '''{
137 "total" : { "value" : 2},
139 {"_source" : {"path" : {"real" : "%BASEPATH%/foo"}}},
140 {"_source" : {"path" : {"real" : "%BASEPATH%/bar"}}}
144 exp_results
= ["foo", "bar"]
145 self
.run_test('*=="samba*"', exp_results
, exp_json_query
, fake_json_response
)
147 def test_mdscli_search_escapes(self
):
149 r
'kMDItemFSName=="x+x"||'
150 r
'kMDItemFSName=="x\*x"||'
151 r
'kMDItemFSName=="x=x"||'
152 'kMDItemFSName=="x\'x"||'
153 r
'kMDItemFSName=="x?x"||'
154 r
'kMDItemFSName=="x x"||'
155 r
'kMDItemFSName=="x(x"||'
156 r
'kMDItemFSName=="x\"x"||'
157 r
'kMDItemFSName=="x\\x"'
159 exp_json_query
= r
'''{
160 "from": 0, "size": 100, "_source": ["path.real"],
163 "query": "(file.filename:x\\+x OR file.filename:x\\*x OR file.filename:x=x OR file.filename:x'x OR file.filename:x\\?x OR file.filename:x\\ x OR file.filename:x\\(x OR file.filename:x\\\"x OR file.filename:x\\\\x) AND path.real.fulltext:\"%BASEPATH%\""
167 fake_json_response
= r
'''{
169 "total" : {"value" : 2},
171 {"_source" : {"path" : {"real" : "%BASEPATH%/x+x"}}},
172 {"_source" : {"path" : {"real" : "%BASEPATH%/x*x"}}},
173 {"_source" : {"path" : {"real" : "%BASEPATH%/x=x"}}},
174 {"_source" : {"path" : {"real" : "%BASEPATH%/x'x"}}},
175 {"_source" : {"path" : {"real" : "%BASEPATH%/x?x"}}},
176 {"_source" : {"path" : {"real" : "%BASEPATH%/x x"}}},
177 {"_source" : {"path" : {"real" : "%BASEPATH%/x(x"}}},
178 {"_source" : {"path" : {"real" : "%BASEPATH%/x\"x"}}},
179 {"_source" : {"path" : {"real" : "%BASEPATH%/x\\x"}}}
194 self
.run_test(sl_query
, exp_results
, exp_json_query
, fake_json_response
)