Last active
December 18, 2017 23:04
-
-
Save sxjscience/453605a1ea3102bc0010f9fb16df8238 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import scipy.stats as ss | |
| import mxnet as mx | |
| def gen_buckets_probs_with_ppf(ppf, nbuckets): | |
| """Generate the buckets and probabilities for chi_square test when the ppf (Quantile function) is specified. | |
| Parameters | |
| ---------- | |
| ppf : function | |
| The Quantile function that takes a probability and maps it back to a value. It's the inverse of the cdf function | |
| nbuckets : int | |
| size of the buckets | |
| Returns | |
| ------- | |
| buckets : list of tuple | |
| The generated buckets | |
| probs : list | |
| The generate probabilities | |
| """ | |
| assert nbuckets > 0 | |
| probs = [1.0 / nbuckets for _ in range(nbuckets)] | |
| buckets = [(ppf(i / float(nbuckets)), ppf((i + 1) / float(nbuckets))) for i in range(nbuckets)] | |
| return buckets, probs | |
| def mean_check(generator, mu, sigma, nsamples=1000000): | |
| """Test the generator by matching the mean. | |
| We test the sample mean by checking if it falls inside the range | |
| (mu - 3 * sigma / sqrt(n), mu + 3 * sigma / sqrt(n)) | |
| References:: | |
| @incollection{goucher2009beautiful, | |
| title={Beautiful Testing: Leading Professionals Reveal How They Improve Software}, | |
| author={Goucher, Adam and Riley, Tim}, | |
| year={2009}, | |
| chapter=10 | |
| } | |
| Examples:: | |
| generator = lambda x: np.random.normal(0, 1.0, size=x) | |
| mean_check_ret = mean_check(generator, 0, 1.0) | |
| Parameters | |
| ---------- | |
| generator : function | |
| The generator function. It's expected to generate N i.i.d samples by calling generator(N). | |
| mu : float | |
| sigma : float | |
| nsamples : int | |
| Returns | |
| ------- | |
| ret : bool | |
| Whether the mean test succeeds | |
| """ | |
| samples = np.array(generator(nsamples)) | |
| sample_mean = samples.mean() | |
| ret = (sample_mean > mu - 3 * sigma / np.sqrt(nsamples)) and\ | |
| (sample_mean < mu + 3 * sigma / np.sqrt(nsamples)) | |
| return ret | |
| def var_check(generator, sigma, nsamples=1000000): | |
| """Test the generator by matching the variance. | |
| It will need a large number of samples and is not recommended to use | |
| We test the sample variance by checking if it falls inside the range | |
| (sigma^2 - 3 * sqrt(2 * sigma^4 / (n-1)), sigma^2 + 3 * sqrt(2 * sigma^4 / (n-1))) | |
| References:: | |
| @incollection{goucher2009beautiful, | |
| title={Beautiful Testing: Leading Professionals Reveal How They Improve Software}, | |
| author={Goucher, Adam and Riley, Tim}, | |
| year={2009}, | |
| chapter=10 | |
| } | |
| Examples:: | |
| generator = lambda x: np.random.normal(0, 1.0, size=x) | |
| var_check_ret = var_check(generator, 0, 1.0) | |
| Parameters | |
| ---------- | |
| generator : function | |
| The generator function. It's expected to generate N i.i.d samples by calling generator(N). | |
| sigma : float | |
| nsamples : int | |
| Returns | |
| ------- | |
| ret : bool | |
| Whether the variance test succeeds | |
| """ | |
| samples = np.array(generator(nsamples)) | |
| sample_var = samples.var(ddof=1) | |
| ret = (sample_var > sigma ** 2 - 3 * np.sqrt(2 * sigma ** 4 / (nsamples - 1))) and\ | |
| (sample_var < sigma ** 2 + 3 * np.sqrt(2 * sigma ** 4 / (nsamples - 1))) | |
| return ret | |
| def chi_square_check(generator, buckets, probs, nsamples=1000000): | |
| """Run the chi-square test for the generator. The generator can be both continuous and discrete. | |
| If the generator is continuous, the buckets should contain tuples of (range_min, range_max) and the probs should be | |
| the corresponding ideal probability within the specific ranges. | |
| Otherwise, the buckets should be the possible output of the discrete distribution and the probs should be | |
| groud-truth probability. | |
| Usually the user is required to specify the probs parameter. | |
| After obtatining the p value, we could further use the standard p > 0.05 threshold to get the final result. | |
| Examples:: | |
| buckets, probs = gen_buckets_probs_with_ppf(lambda x: ss.norm.ppf(x, 0, 1), 5) | |
| generator = lambda x: np.random.normal(0, 1.0, size=x) | |
| p = chi_square_check(generator=generator, buckets=buckets, probs=probs) | |
| assert(p > 0.05) | |
| Parameters | |
| ---------- | |
| generator: function | |
| A function that is assumed to generate i.i.d samples from a specific distribution. | |
| generator(N) should generate N random samples. | |
| buckets: list of tuple or list of number | |
| The buckets to run the chi-square the test. Make sure that the buckets cover the whole range of | |
| the distribution. Also, the buckets must be in ascending order and have no | |
| intersection | |
| probs: list or tuple | |
| The ground-truth probability of the random value fall in a specific bucket. | |
| nsamples:int | |
| Returns | |
| ------- | |
| probability : float | |
| p value that the generator has the expected distribution. | |
| """ | |
| assert isinstance(buckets, list) | |
| samples = generator(nsamples) | |
| assert len(probs) == len(buckets) | |
| if isinstance(buckets[0], (list, tuple)): | |
| # Check whether the buckets are valid and fill them into a npy array | |
| continuous_dist = True | |
| buckets_npy = np.zeros((len(buckets) * 2, ), dtype=np.float32) | |
| for i in range(len(buckets)): | |
| assert(buckets[i][0] <= buckets[i][1]) | |
| if i < len(buckets) - 1: | |
| assert(buckets[i][1] <= buckets[i + 1][0]) | |
| buckets_npy[i * 2] = buckets[i][0] | |
| buckets_npy[i * 2 + 1] = buckets[i][1] | |
| else: | |
| continuous_dist = False | |
| buckets_npy = np.array(buckets) | |
| expected_freq = (nsamples * np.array(probs, dtype=np.float32)).astype(np.int32) | |
| if continuous_dist: | |
| sample_bucket_ids = np.searchsorted(buckets_npy, samples, side='right') | |
| else: | |
| sample_bucket_ids = samples | |
| if continuous_dist: | |
| sample_bucket_ids = sample_bucket_ids // 2 | |
| obs_freq = np.zeros(shape=len(buckets), dtype=np.int) | |
| for i in range(len(buckets)): | |
| obs_freq[i] = (sample_bucket_ids == i).sum() | |
| _, p = ss.chisquare(f_obs=obs_freq, f_exp=expected_freq) | |
| return p, obs_freq, expected_freq | |
| def verify_generator(generator, buckets, probs, nrepeat=5): | |
| cs_ret_l = [] | |
| obs_freq_l = [] | |
| expected_freq_l = [] | |
| for _ in range(nrepeat): | |
| cs_ret, obs_freq, expected_freq = chi_square_check(generator=generator, buckets=buckets, probs=probs) | |
| cs_ret_l.append(cs_ret) | |
| obs_freq_l.append(obs_freq) | |
| expected_freq_l.append(expected_freq) | |
| success_num = (np.array(cs_ret_l) > 0.05).sum() | |
| if success_num < nrepeat * 0.25: | |
| raise AssertionError("Generator test fails, Chi-square p=%s, obs_freq=%s, expected_freq=%s." | |
| "\nbuckets=%s, probs=%s" | |
| % (str(cs_ret_l), str(obs_freq_l), str(expected_freq_l), str(buckets), str(probs))) | |
| return cs_ret_l | |
| def test_normal(): | |
| print("Normal Distribution Test:") | |
| for mu, sigma in [(0.0, 1.0), (1.0, 5.0)]: | |
| print("Mu=%g, Sigma=%g:" % (mu, sigma)) | |
| buckets, probs = gen_buckets_probs_with_ppf(lambda x: ss.norm.ppf(x, mu, sigma), 5) | |
| generator_np = lambda x: np.random.normal(mu, sigma, size=x) | |
| print("Numpy...", end='') | |
| verify_generator(generator=generator_np, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| generator_mx_cpu = lambda x: mx.nd.random.normal(mu, sigma, shape=x, ctx=mx.cpu()).asnumpy() | |
| generator_mx_gpu = lambda x: mx.nd.random.normal(mu, sigma, shape=x, ctx=mx.gpu()).asnumpy() | |
| print("MX CPU...", end='') | |
| verify_generator(generator=generator_mx_cpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| print("MX GPU...", end='') | |
| verify_generator(generator=generator_mx_gpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| def test_uniform(): | |
| print("Uniform Distribution Test:") | |
| for low, high in [(-1.0, 1.0), (1.0, 5.0)]: | |
| print("Low=%g, High=%g:" % (low, high)) | |
| buckets, probs = gen_buckets_probs_with_ppf(lambda x: ss.uniform.ppf(x, loc=low, scale=high - low), 5) | |
| generator_np = lambda x: np.random.uniform(low, high, size=x) | |
| print("Numpy...", end='') | |
| verify_generator(generator=generator_np, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| generator_mx_cpu = lambda x: mx.nd.random.uniform(low, high, shape=x, ctx=mx.cpu()).asnumpy() | |
| generator_mx_gpu = lambda x: mx.nd.random.uniform(low, high, shape=x, ctx=mx.gpu()).asnumpy() | |
| print("MX CPU...", end='') | |
| verify_generator(generator=generator_mx_cpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| print("MX GPU...", end='') | |
| verify_generator(generator=generator_mx_gpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| def test_gamma(): | |
| print("Gamma Distribution Test:") | |
| for kappa, theta in [(0.5, 1.0), (1.0, 5.0)]: | |
| print("Shape=%g, Scale=%g:" % (kappa, theta)) | |
| buckets, probs = gen_buckets_probs_with_ppf(lambda x: ss.gamma.ppf(x, a=kappa, loc=0, scale=theta), 5) | |
| generator_np = lambda x: np.random.gamma(kappa, theta, size=x) | |
| print("Numpy...", end='') | |
| verify_generator(generator=generator_np, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| generator_mx_cpu = lambda x: mx.nd.random.gamma(kappa, theta, shape=x, ctx=mx.cpu()).asnumpy() | |
| generator_mx_gpu = lambda x: mx.nd.random.gamma(kappa, theta, shape=x, ctx=mx.gpu()).asnumpy() | |
| print("MX CPU...", end='') | |
| verify_generator(generator=generator_mx_cpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| print("MX GPU...", end='') | |
| verify_generator(generator=generator_mx_gpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| def test_exponential(): | |
| print("Exponential Distribution Test:") | |
| for scale in [0.1, 1.0]: | |
| print("Scale=%g:" % scale) | |
| buckets, probs = gen_buckets_probs_with_ppf(lambda x: ss.expon.ppf(x, loc=0, scale=scale), 5) | |
| generator_np = lambda x: np.random.exponential(scale, size=x) | |
| print("Numpy...", end='') | |
| verify_generator(generator=generator_np, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| generator_mx_cpu = lambda x: mx.nd.random.exponential(scale, shape=x, ctx=mx.cpu()).asnumpy() | |
| generator_mx_gpu = lambda x: mx.nd.random.exponential(scale, shape=x, ctx=mx.gpu()).asnumpy() | |
| print("MX CPU...", end='') | |
| verify_generator(generator=generator_mx_cpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| print("MX GPU...", end='') | |
| verify_generator(generator=generator_mx_gpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| def test_poisson(): | |
| print("Poisson Distribution Test:") | |
| for lam in [1, 10]: | |
| buckets = [(-1.0, lam - 0.5), (lam - 0.5, 2 * lam + 0.5), (2 * lam + 0.5, np.inf)] | |
| probs = [ss.poisson.cdf(bucket[1], lam) - ss.poisson.cdf(bucket[0], lam) for bucket in buckets] | |
| generator_np = lambda x: np.random.poisson(lam, size=x) | |
| print("Numpy...", end='') | |
| verify_generator(generator=generator_np, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| generator_mx_cpu = lambda x: mx.nd.random.poisson(lam, shape=x, ctx=mx.cpu()).asnumpy() | |
| generator_mx_gpu = lambda x: mx.nd.random.poisson(lam, shape=x, ctx=mx.gpu()).asnumpy() | |
| print("MX CPU...", end='') | |
| verify_generator(generator=generator_mx_cpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| print("MX GPU...", end='') | |
| verify_generator(generator=generator_mx_gpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| def test_negative_binomial(): | |
| print('Negative Binomial Distribution Test:') | |
| success_num = 2 | |
| success_prob = 0.2 | |
| buckets = [(-1.0, 2.5), (2.5, 5.5), (5.5, 8.5), (8.5, np.inf)] | |
| probs = [ss.nbinom.cdf(bucket[1], success_num, success_prob) - | |
| ss.nbinom.cdf(bucket[0], success_num, success_prob) for bucket in buckets] | |
| generator_np = lambda x: np.random.negative_binomial(success_num, success_prob, size=x) | |
| verify_generator(generator=generator_np, buckets=buckets, probs=probs) | |
| generator_mx_cpu = lambda x: mx.nd.random.negative_binomial(success_num, success_prob, | |
| shape=x, ctx=mx.cpu()).asnumpy() | |
| generator_mx_gpu = lambda x: mx.nd.random.negative_binomial(success_num, success_prob, | |
| shape=x, ctx=mx.gpu()).asnumpy() | |
| print("MX CPU...", end='') | |
| verify_generator(generator=generator_mx_cpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| print("MX GPU...", end='') | |
| verify_generator(generator=generator_mx_gpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| # Also test the Gamm-Poisson Mixture | |
| print('Gamm-Poisson Mixture Test:') | |
| alpha = 1.0 / success_num | |
| mu = (1.0 - success_prob) / success_prob / alpha | |
| generator_mx_cpu = lambda x: mx.nd.random.generalized_negative_binomial(mu, alpha, | |
| shape=x, ctx=mx.cpu()).asnumpy() | |
| generator_mx_gpu = lambda x: mx.nd.random.generalized_negative_binomial(mu, alpha, | |
| shape=x, ctx=mx.gpu()).asnumpy() | |
| print("MX CPU...", end='') | |
| verify_generator(generator=generator_mx_cpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| print("MX GPU...", end='') | |
| verify_generator(generator=generator_mx_gpu, buckets=buckets, probs=probs) | |
| print("Pass!") | |
| def test_multinomial(): | |
| print('Multinomial Distribution Test:') | |
| probs = [0.1, 0.2, 0.3, 0.05, 0.15, 0.2] | |
| sample_num = 1000000 | |
| buckets = list(range(6)) | |
| generator_mx_cpu = lambda x: mx.nd.random.multinomial(data=mx.nd.array(np.array(probs), ctx=mx.cpu()), | |
| shape=sample_num).asnumpy() | |
| generator_mx_gpu = lambda x: mx.nd.random.multinomial(data=mx.nd.array(np.array(probs), ctx=mx.gpu()), | |
| shape=sample_num).asnumpy() | |
| print("MX CPU...", end='') | |
| verify_generator(generator_mx_cpu, buckets, probs) | |
| print("Pass!") | |
| print("MX GPU...", end='') | |
| verify_generator(generator_mx_gpu, buckets, probs) | |
| print("Pass!") | |
| if __name__ == "__main__": | |
| import nose | |
| nose.runmodule() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment